8 #include <framework/pcore/ProcessMonitor.h> 
    9 #include <framework/core/EventProcessor.h> 
   10 #include <framework/pcore/GlobalProcHandler.h> 
   12 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h> 
   13 #include <framework/pcore/zmq/messages/ZMQDefinitions.h> 
   21                                const std::string& controlSocketAddress)
 
   30     zmq::context_t context(1);
 
   32     zmq::socket_t pubSocket(context, ZMQ_XPUB);
 
   34     pubSocket.bind(subSocketAddress);
 
   35     pubSocket.set(zmq::sockopt::linger, 0);
 
   37     zmq::socket_t subSocket(context, ZMQ_XSUB);
 
   39     subSocket.bind(pubSocketAddress);
 
   40     subSocket.set(zmq::sockopt::linger, 0);
 
   42     zmq::socket_t controlSocket(context, ZMQ_SUB);
 
   43     controlSocket.bind(controlSocketAddress);
 
   44     controlSocket.set(zmq::sockopt::linger, 0);
 
   45     controlSocket.set(zmq::sockopt::subscribe, 
"");
 
   47     B2DEBUG(10, 
"Will now start the proxy..");
 
   51         zmq::socket_ref nullsocket;
 
   52         zmq::proxy_steerable(pubSocket, subSocket, nullsocket, controlSocket);
 
   54       } 
catch (zmq::error_t& ex) {
 
   55         if (ex.num() != EINTR) {
 
   56           B2ERROR(
"There was an error during the proxy event: " << ex.what());
 
   61     controlSocket.close();
 
   65     B2DEBUG(10, 
"Proxy has finished");
 
   70   std::this_thread::sleep_for(std::chrono::milliseconds(500));
 
   73   m_client.
initialize<ZMQ_PUB>(pubSocketAddress, subSocketAddress, controlSocketAddress, 
false);
 
   79   B2DEBUG(10, 
"Started multicast publishing on " << pubSocketAddress << 
" and subscribing on " << subSocketAddress);
 
   98     B2DEBUG(10, 
"Try to kill the processes gently...");
 
  105     const auto multicastAnswer = [
this](
const auto & socket) {
 
  109           B2DEBUG(10, 
"Process pid " << pair.first << 
" of type " << 
static_cast<char>(pair.second) << 
" is still alive");
 
  116     bool allProcessesStopped = 
true;
 
  119         allProcessesStopped = 
false;
 
  124     if (not allProcessesStopped) {
 
  125       B2DEBUG(10, 
"Start waiting for processes to go down.");
 
  127       B2DEBUG(10, 
"Finished waiting for processes to go down.");
 
  132     B2DEBUG(10, 
"Will kill the proxy now.");
 
  136     std::this_thread::sleep_for(std::chrono::milliseconds(10));
 
  146     const auto multicastAnswer = [
this](
const auto & socket) {
 
  152     if (not pullResult) {
 
  153       B2ERROR(
"Input process did not start properly!");
 
  162     const auto multicastAnswer = [
this](
const auto & socket) {
 
  168     if (not pullResult) {
 
  169       B2ERROR(
"Worker process did not start properly!");
 
  178     const auto multicastAnswer = [
this](
const auto & socket) {
 
  184     if (not pullResult) {
 
  185       B2ERROR(
"Output process did not start properly!");
 
  202   const auto multicastAnswer = [
this](
const auto & socket) {
 
  209 template <
class ASocket>
 
  212   auto pcbMulticastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
 
  213   if (pcbMulticastMessage->isMessage(EMessageTypes::c_helloMessage)) {
 
  214     const int pid = std::stoi(pcbMulticastMessage->getData());
 
  220   } 
else if (pcbMulticastMessage->isMessage(EMessageTypes::c_goodbyeMessage)) {
 
  221     const int pid = std::stoi(pcbMulticastMessage->getData());
 
  224       B2WARNING(
"An unknown PID died!");
 
  227     const ProcType procType = processIt->second;
 
  236   } 
else if (pcbMulticastMessage->isMessage(EMessageTypes::c_killWorkerMessage)) {
 
  237     const int workerPID = atoi(pcbMulticastMessage->getData().c_str());
 
  238     B2DEBUG(10, 
"Got message to kill worker " << workerPID);
 
  239     if (kill(workerPID, SIGKILL) == 0) {
 
  240       B2WARNING(
"Needed to kill worker  " << workerPID << 
" as it was requested.");
 
  242       B2ERROR(
"Try to kill process " << workerPID << 
", but process is already gone.");
 
  245   } 
else if (pcbMulticastMessage->isMessage(EMessageTypes::c_statisticMessage)) {
 
  247     B2DEBUG(10, 
"Having received the process statistics");
 
  262     const auto& pair = *iter;
 
  264     B2ASSERT(
"This pid should not be in our list!", pair.first != 0);
 
  266     if (std::find(currentProcessList.begin(), currentProcessList.end(), pair.first) != currentProcessList.end()) {
 
  273       B2ERROR(
"An input process has died unexpected! Need to go down.");
 
  277       B2ERROR(
"An output process has died unexpected! Need to go down.");
 
  281       B2ERROR(
"A proxy process has died unexpected! Need to go down.");
 
  285       B2WARNING(
"A worker process has died unexpected. If you have requested, I will now restart the workers.");
 
  290       B2DEBUG(10, 
"An children process has died expectedly.");
 
  316     B2DEBUG(10, 
"No input and no output process around. Will go home now!");
 
  322   if (g_signalReceived > 0) {
 
  323     B2DEBUG(10, 
"Received signal " << g_signalReceived);
 
  341   if (neededWorkers < 0) {
 
  342     B2FATAL(
"Something went completely wrong here! I have more workers as requested...");
 
  344   if (neededWorkers > 0) {
 
  345     B2DEBUG(10, 
"I need to restart " << neededWorkers << 
" workers");
 
  347   return static_cast<unsigned int>(neededWorkers);
 
  352   auto correctProcType = [&procType](
const auto & pair) {
 
  353     return pair.second == procType;
 
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
static ProcType getProcType(int pid)
Return the proc type of this process.
static const std::vector< int > & getPIDList()
Return the PID list handled by the running GlobalProcHandler. This PID list is updated using the sign...
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
static bool startProxyProcess()
Fork and initialize a proxy process.
static void killAllProcesses()
Hard kill all processes.
void processMulticast(const ASocket &socket)
Process a message from the multicast.
bool m_hasEnded
Someone requested us to end the processing.
StreamHelper m_streamer
The data store streamer.
std::map< int, ProcType > m_processList
The current list of pid -> process types (to be compared to the proc handler)
void checkMulticast(int timeout=0)
check multicast for messages and kill workers if requested
void waitForRunningWorker(int timeout)
Block until either the worker process is running or the timeout (in seconds) is raised.
bool hasEnded() const
If we have received a SIGINT signal or the last process is gone, we can end smoothly.
void checkSignals(int g_signalReceived)
check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
void waitForRunningInput(int timeout)
Block until either the input process is running or the timeout (in seconds) is raised.
void checkChildProcesses()
check the child processes, if one has died
bool m_receivedStatistics
Did we already receive the statistics?
void terminate()
Terminate the processing.
unsigned int m_requestedNumberOfWorkers
How many workers we should request to start.
void waitForRunningOutput(int timeout)
Block until either the output process is running or the timeout (in seconds) is raised.
void subscribe(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &controlSocketAddress)
Start listening for process management information on the given address.
bool hasWorkers() const
Check if there is at least one running worker.
void killProcesses(unsigned int timeout)
Ask all processes to terminate. If not, kill them after timeout seconds.
unsigned int processesWithType(const ProcType &procType) const
Cound the number of processes with a certain type.
ZMQClient m_client
The client used for message processing.
void initialize(unsigned int requestedNumberOfWorkers)
Init the processing with that many workers.
unsigned int needMoreWorkers() const
Compare our current list of workers of how many we want to have.
void reset()
Reset the internal state.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
bool isOnline() const
Check if the client was initialized and not terminated.
void publish(AZMQMessage message) const
Publish the message to the multicast.
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
void reset()
Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up t...
void send(AZMQMessage message) const
Send a message over the data socket.
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
ProcType
Type of the process used for storing and mapping the child processes in the process handler.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
@ c_Proxy
Multicast Proxy Process.
@ c_Output
Output Process.
@ c_Worker
Worker/Reconstruction Process.
@ c_Monitor
Monitoring Process.
@ c_Stopped
The process is stopped/killed.
@ c_Init
Before the forks, the process is in init state.
Abstract base class for different kinds of events.