8 #include <framework/pcore/PathUtils.h>
9 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
10 #include <framework/core/ModuleManager.h>
11 #include <framework/core/Environment.h>
19 std::set<std::string> uselessParallelModules({
"HistoManager",
"Gearbox",
"Geometry"});
26 for (
const ModulePtr& module : path->getModules()) {
29 if (hasParallelFlag and module->hasCondition()) {
30 for (
const auto& conditionPath : module->getAllConditionPaths()) {
32 hasParallelFlag =
false;
37 if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
41 bool path_is_useful =
false;
42 for (
const auto& parallelModule : mainPath->getModules()) {
43 if (uselessParallelModules.count(parallelModule->getType()) == 0) {
44 path_is_useful =
true;
48 if (not path_is_useful) {
50 inputPath->addPath(mainPath);
51 mainPath.reset(
new Path);
59 inputPath->addModule(module);
60 }
else if (stage == 1) {
61 mainPath->addModule(module);
62 }
else if (stage == 2) {
63 outputPath->addModule(module);
67 bool createAllPaths =
false;
68 for (
const ModulePtr& module : path->getModules()) {
70 createAllPaths =
true;
75 if (mainPath->isEmpty() and not createAllPaths) {
78 if (inputPath->isEmpty() and not createAllPaths) {
81 if (outputPath->isEmpty() and not createAllPaths) {
85 return {inputPath, mainPath, outputPath};
91 for (
const ModulePtr& module : inputPath->getModules()) {
94 histoManagerModule = module;
97 mainPath->addModule(histoManagerModule);
98 outputPath->addModule(histoManagerModule);
102 return histoManagerModule;
106 B2ASSERT(
"The main part is empty. This is a bug in the framework.",
107 mainPath and not mainPath->isEmpty());
113 const auto& socketAddress = environment.getZMQSocketAddress();
119 unsigned int maximalWaitingTime = environment.getZMQMaximalWaitingTime();
120 unsigned int eventBufferSize = environment.getZMQEventBufferSize();
121 unsigned int workerTimeout = environment.getZMQWorkerTimeout();
122 bool useEventBackup = environment.getZMQUseEventBackup();
127 zmqTxInputModule->getParam<std::string>(
"socketName").setValue(inputSocketAddress);
128 zmqTxInputModule->getParam<std::string>(
"xpubProxySocketName").setValue(pubSocketAddress);
129 zmqTxInputModule->getParam<std::string>(
"xsubProxySocketName").setValue(subSocketAddress);
130 zmqTxInputModule->getParam<
unsigned int>(
"workerProcessTimeout").setValue(workerTimeout);
131 zmqTxInputModule->getParam<
bool>(
"useEventBackup").setValue(useEventBackup);
132 zmqTxInputModule->getParam<
unsigned int>(
"maximalWaitingTime").setValue(maximalWaitingTime);
137 zmqRxWorkerModule->getParam<std::string>(
"socketName").setValue(inputSocketAddress);
138 zmqRxWorkerModule->getParam<std::string>(
"xpubProxySocketName").setValue(pubSocketAddress);
139 zmqRxWorkerModule->getParam<std::string>(
"xsubProxySocketName").setValue(subSocketAddress);
140 zmqRxWorkerModule->getParam<
unsigned int>(
"maximalWaitingTime").setValue(maximalWaitingTime);
141 zmqRxWorkerModule->getParam<
unsigned int>(
"eventBufferSize").setValue(eventBufferSize);
148 zmqTxWorkerModule->getParam<std::string>(
"socketName").setValue(outputSocketAddress);
149 zmqTxWorkerModule->getParam<std::string>(
"xpubProxySocketName").setValue(pubSocketAddress);
150 zmqTxWorkerModule->getParam<std::string>(
"xsubProxySocketName").setValue(pubSocketAddress);
155 zmqRxOutputModule->getParam<std::string>(
"socketName").setValue(outputSocketAddress);
156 zmqRxOutputModule->getParam<std::string>(
"xpubProxySocketName").setValue(pubSocketAddress);
157 zmqRxOutputModule->getParam<std::string>(
"xsubProxySocketName").setValue(subSocketAddress);
158 zmqRxOutputModule->getParam<
unsigned int>(
"maximalWaitingTime").setValue(maximalWaitingTime);
163 B2INFO(
"Input Path " << inputPath->getPathString());
166 B2INFO(
"Main Path " << mainPath->getPathString());
169 B2INFO(
"Output Path " << outputPath->getPathString());
178 mergedPath.
addPath(outputPath);
188 tmpModuleList.push_back(m);
190 return tmpModuleList;
195 for (
const ModulePtr& m : prependModules) {
196 if (std::find(modules->begin(), modules->end(), m) == modules->end()) {
197 modules->push_front(m);
204 path->addModule(module);
210 newPath->addModule(module);
211 newPath->addPath(path);
static Environment & Instance()
Static method to get a reference to the Environment instance.
std::shared_ptr< Module > registerModule(const std::string &moduleName, std::string sharedLibPath="") noexcept(false)
Creates an instance of a module and registers it to the ModuleManager.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
static ModuleManager & Instance()
Exception is thrown if the requested module could not be created by the ModuleManager.
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
static ModulePtrList getTerminateGloballyModules(const ModulePtrList &modules)
Return only modules which have the TerminateGlobally Module flag set.
static ModulePtrList preparePaths(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Adds internal zmq modules to the paths.
static std::tuple< PathPtr, PathPtr, PathPtr > splitPath(const PathPtr &path)
Split the given part into the input, main and output path (in this order) by looking onto the paralle...
static ModulePtr getHistogramManager(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Find the histogram manager in the paths and return it.
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
static void prependModule(PathPtr &path, const ModulePtr &module)
Append given modules to the path.
static void appendModule(PathPtr &path, const ModulePtr &module)
Prepend given modules to the path.
Implements a path consisting of Module and/or Path objects.
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
@ c_sub
Multicast publish socket.
Abstract base class for different kinds of events.