 |
Belle II Software
release-05-02-19
|
10 #include <framework/pcore/ProcessMonitor.h>
11 #include <framework/core/EventProcessor.h>
12 #include <framework/pcore/GlobalProcHandler.h>
14 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
15 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
23 const std::string& controlSocketAddress)
32 zmq::context_t context(1);
34 zmq::socket_t pubSocket(context, ZMQ_XPUB);
36 pubSocket.bind(subSocketAddress);
37 pubSocket.setsockopt(ZMQ_LINGER, 0);
39 zmq::socket_t subSocket(context, ZMQ_XSUB);
41 subSocket.bind(pubSocketAddress);
42 subSocket.setsockopt(ZMQ_LINGER, 0);
44 zmq::socket_t controlSocket(context, ZMQ_SUB);
45 controlSocket.bind(controlSocketAddress);
46 controlSocket.setsockopt(ZMQ_LINGER, 0);
47 controlSocket.setsockopt(ZMQ_SUBSCRIBE,
"", 0);
49 B2DEBUG(10,
"Will now start the proxy..");
53 zmq::proxy_steerable(pubSocket, subSocket,
nullptr, controlSocket);
55 }
catch (zmq::error_t& ex) {
56 if (ex.num() != EINTR) {
57 B2ERROR(
"There was an error during the proxy event: " << ex.what());
62 controlSocket.close();
66 B2DEBUG(10,
"Proxy has finished");
71 std::this_thread::sleep_for(std::chrono::milliseconds(500));
74 m_client.
initialize<ZMQ_PUB>(pubSocketAddress, subSocketAddress, controlSocketAddress,
false);
80 B2DEBUG(10,
"Started multicast publishing on " << pubSocketAddress <<
" and subscribing on " << subSocketAddress);
99 B2DEBUG(10,
"Try to kill the processes gently...");
106 const auto multicastAnswer = [
this](
const auto & socket) {
110 B2DEBUG(10,
"Process pid " << pair.first <<
" of type " <<
static_cast<char>(pair.second) <<
" is still alive");
117 bool allProcessesStopped =
true;
120 allProcessesStopped =
false;
125 if (not allProcessesStopped) {
126 B2DEBUG(10,
"Start waiting for processes to go down.");
128 B2DEBUG(10,
"Finished waiting for processes to go down.");
133 B2DEBUG(10,
"Will kill the proxy now.");
137 std::this_thread::sleep_for(std::chrono::milliseconds(10));
147 const auto multicastAnswer = [
this](
const auto & socket) {
153 if (not pullResult) {
154 B2ERROR(
"Input process did not start properly!");
163 const auto multicastAnswer = [
this](
const auto & socket) {
169 if (not pullResult) {
170 B2ERROR(
"Worker process did not start properly!");
179 const auto multicastAnswer = [
this](
const auto & socket) {
185 if (not pullResult) {
186 B2ERROR(
"Output process did not start properly!");
203 const auto multicastAnswer = [
this](
const auto & socket) {
210 template <
class ASocket>
213 auto pcbMulticastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
214 if (pcbMulticastMessage->isMessage(EMessageTypes::c_helloMessage)) {
215 const int pid = std::stoi(pcbMulticastMessage->getData());
221 }
else if (pcbMulticastMessage->isMessage(EMessageTypes::c_goodbyeMessage)) {
222 const int pid = std::stoi(pcbMulticastMessage->getData());
225 B2WARNING(
"An unknown PID died!");
228 const ProcType procType = processIt->second;
237 }
else if (pcbMulticastMessage->isMessage(EMessageTypes::c_killWorkerMessage)) {
238 const int workerPID = atoi(pcbMulticastMessage->getData().c_str());
239 B2DEBUG(10,
"Got message to kill worker " << workerPID);
240 if (kill(workerPID, SIGKILL) == 0) {
241 B2WARNING(
"Needed to kill worker " << workerPID <<
" as it was requested.");
243 B2ERROR(
"Try to kill process " << workerPID <<
", but process is already gone.");
246 }
else if (pcbMulticastMessage->isMessage(EMessageTypes::c_statisticMessage)) {
248 B2DEBUG(10,
"Having received the process statistics");
263 const auto& pair = *iter;
265 B2ASSERT(
"This pid should not be in our list!", pair.first != 0);
267 if (std::find(currentProcessList.begin(), currentProcessList.end(), pair.first) != currentProcessList.end()) {
274 B2ERROR(
"An input process has died unexpected! Need to go down.");
278 B2ERROR(
"An output process has died unexpected! Need to go down.");
282 B2ERROR(
"A proxy process has died unexpected! Need to go down.");
286 B2WARNING(
"A worker process has died unexpected. If you have requested, I will now restart the workers.");
291 B2DEBUG(10,
"An children process has died expectedly.");
317 B2DEBUG(10,
"No input and no output process around. Will go home now!");
323 if (g_signalReceived > 0) {
324 B2DEBUG(10,
"Received signal " << g_signalReceived);
342 if (neededWorkers < 0) {
343 B2FATAL(
"Something went completely wrong here! I have more workers as requested...");
345 if (neededWorkers > 0) {
346 B2DEBUG(10,
"I need to restart " << neededWorkers <<
" workers");
348 return static_cast<unsigned int>(neededWorkers);
353 auto correctProcType = [&procType](
const auto & pair) {
354 return pair.second == procType;
ProcType
Type of the process used for storing and mapping the child processes in the process handler.
@ c_Monitor
Monitoring Process.
bool m_hasEnded
Someone requested us to end the processing.
@ c_Output
Output Process.
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
static const std::vector< int > & getPIDList()
Return the PID list handled by the running GlobalProcHandler. This PID list is updated using the sign...
std::map< int, ProcType > m_processList
The current list of pid -> process types (to be compared to the proc handler)
void subscribe(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &controlSocketAddress)
Start listening for process management information on the given address.
void initialize(unsigned int requestedNumberOfWorkers)
Init the processing with that many workers.
bool m_receivedStatistics
Did we already receive the statistics?
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
bool hasWorkers() const
Check if there is at least one running worker.
static ProcType getProcType(int pid)
Return the proc type of this process.
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
unsigned int m_requestedNumberOfWorkers
How many workers we should request to start.
void terminate()
Terminate the processing.
static bool startProxyProcess()
Fork and initialize a proxy process.
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
@ c_Init
Before the forks, the process is in init state.
@ c_Stopped
The process is stopped/killed.
bool isOnline() const
Check if the client was initialized and not terminated.
void waitForRunningInput(int timeout)
Block until either the input process is running or the timeout (in seconds) is raised.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
@ c_Worker
Worker/Reconstruction Process.
void reset()
Reset the internal state.
unsigned int processesWithType(const ProcType &procType) const
Cound the number of processes with a certain type.
Abstract base class for different kinds of events.
static void killAllProcesses()
Hard kill all processes.
void send(AZMQMessage message) const
Send a message over the data socket.
void processMulticast(const ASocket &socket)
Process a message from the multicast.
void publish(AZMQMessage message) const
Publish the message to the multicast.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
void checkMulticast(int timeout=0)
check multicast for messages and kill workers if requested
unsigned int needMoreWorkers() const
Compare our current list of workers of how many we want to have.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
void waitForRunningWorker(int timeout)
Block until either the worker process is running or the timeout (in seconds) is raised.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
void checkSignals(int g_signalReceived)
check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
bool hasEnded() const
If we have received a SIGINT signal or the last process is gone, we can end smoothly.
void waitForRunningOutput(int timeout)
Block until either the output process is running or the timeout (in seconds) is raised.
void killProcesses(unsigned int timeout)
Ask all processes to terminate. If not, kill them after timeout seconds.
@ c_Proxy
Multicast Proxy Process.
void reset()
Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up t...
ZMQClient m_client
The client used for message processing.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
StreamHelper m_streamer
The data store streamer.
void checkChildProcesses()
check the child processes, if one has died