Belle II Software  release-08-01-10
RoiSenderCallback.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/roisend/RoiSenderCallback.h>
10 
11 #include <sys/stat.h>
12 #include <sys/types.h>
13 #include <unistd.h>
14 
15 #include <csignal>
16 
17 using namespace Belle2;
18 using namespace std;
19 
20 static RoiSenderCallback* s_roisender = NULL;
21 
22 //-----------------------------------------------------------------
23 // Rbuf-Read Thread Interface
24 //-----------------------------------------------------------------
25 void* RunRoiSenderLogger(void*)
26 {
27  s_roisender->server();
28  return NULL;
29 }
30 
31 
32 
33 
34 RoiSenderCallback::RoiSenderCallback()
35 {
36  // Conf file
37  m_conf = new RFConf(getenv("ROISENDER_CONFFILE"));
38 
39  s_roisender = this;
40 }
41 
42 
43 void RoiSenderCallback::load(const DBObject&, const std::string&)
44 {
45  // 1. Set execution directory
46  // char* chr_execdir = m_conf->getconf("expreco","execdir_base");
47  // printf ( "execdir = %s\n", chr_execdir );
48  char* nodename = m_conf->getconf("roisender", "nodename");
49 
50  string execdir = string(m_conf->getconf("roisender", "execdir"));
51  printf("execdir = %s\n", execdir.c_str());
52 
53  mkdir(execdir.c_str(), 0755);
54  chdir(execdir.c_str());
55 
56  // 2. Initialize process manager if not existing
57  if (!m_proc) m_proc = new RFProcessManager(nodename);
58 
59  // 3. Initialize log manager if not existing
60  if (!m_log) m_log = new RFLogManager(nodename, nodename);
61 
62  // 4. Initialize local shared memory if not existing, this seems to be used by flow monitoring in mergemerge?
63  if (!m_shm) m_shm = new RFSharedMem(nodename);
64 
65 
66  // 5. Run MergerMerge
67  char* merger = m_conf->getconf("roisender", "merger");
68  char* mergerport = m_conf->getconf("roisender", "mergerport");
69 // char* mergerhost = m_conf->getconf("roisender", "mergerhost");
70  char* onsenhost = m_conf->getconf("roisender", "onsenhost");
71  char* onsenport = m_conf->getconf("roisender", "onsenport");
72 
73  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
74  string(m_conf->getconf("roisender", "nodename"));
75 
76  char idbuf[11]; // maximum length by cppcheck
77  sprintf(idbuf, "%2.2d", 0);
78 
79  // Execute merger_merge
80  m_pid_merger = m_proc->Execute(merger, (char*)shmname.c_str(), idbuf, onsenhost, onsenport, mergerport);
81  if (m_pid_merger <= 0) {
82  LogFile::error("loading %s %s %s %s %s %s failed!", merger, (char*)shmname.c_str(), idbuf, onsenhost, onsenport, mergerport);
83  setState(RCState::ERROR_ES);
84  return;
85  }
86 
87  sleep(2); // WHY a sleep? if it is a race condition find & destroy!
88 
89  // 6. Start Logger -- only on first load
90  if (!m_logthread) {
91  pthread_attr_t thread_attr;
92  pthread_attr_init(&thread_attr);
93  // pthread_attr_setschedpolicy(&thread_attr , SCHED_FIFO);
94  // pthread_attr_setdetachstate(&thread_attr , PTHREAD_CREATE_DETACHED);
95  // pthread_t thr_input;
96  pthread_create(&m_logthread, NULL, RunRoiSenderLogger, NULL);
97  }
98 }
99 
100 void RoiSenderCallback::start(int /*expno*/, int /*runno*/)
101 {
102  if (m_pid_merger > 0) {
103  int pid = m_pid_merger;
104  kill(pid, SIGUSR1);
105  LogFile::info("Send SIGUSR1 to (pid=%d)", pid);// attention, race condition!
106  } else {
107  LogFile::error("No merger_merge in start!");
108  setState(RCState::ERROR_ES);
109  }
110 }
111 
112 void RoiSenderCallback::stop(void)
113 {
114  if (m_pid_merger > 0) {
115  int pid = m_pid_merger;
116  kill(pid, SIGUSR2);
117  LogFile::info("Send SIGUSR2 to (pid=%d)", pid);// attention, race condition!
118  } else {
119  LogFile::error("No merger_merge in stop!");
120  setState(RCState::ERROR_ES);
121  }
122 }
123 
124 void RoiSenderCallback::abort(void)
125 {
126  // Kill processes
127  if (m_pid_merger > 0) {
128  int pid = m_pid_merger;
129  kill(pid, SIGINT);
130  LogFile::info("kill merger (pid=%d) with SIGINT ", pid);// attention, race condition!
131  }
132  // wait until
133  for (int i = 0; m_pid_merger; i++) {
134  sleep(1);
135  int pid = m_pid_merger;
136  if (i == 5 && m_pid_merger > 0) {
137  kill(pid, SIGKILL); // force!
138  LogFile::warning("kill merger (pid=%d) with SIGKILL", pid);
139  }
140  if (i == 10) {
141  LogFile::error("killing merger (pid=%d) did not work", pid);
142  m_pid_merger = 0;
143  break;
144  }
145  }
146 }
147 
148 void RoiSenderCallback::recover(const DBObject&, const std::string&)
149 {
150  // Kill processes
151  abort();
152 
153  sleep(2); // WHY a sleep? if it is a race condition find & destroy!
154 
155  // 1. Run merger first
156  char* merger = m_conf->getconf("roisender", "merger");
157  char* mergerport = m_conf->getconf("roisender", "mergerport");
158 // char* mergerhost = m_conf->getconf("roisender", "mergerhost");
159  char* onsenhost = m_conf->getconf("roisender", "onsenhost");
160  char* onsenport = m_conf->getconf("roisender", "onsenport");
161 
162  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
163  string(m_conf->getconf("roisender", "nodename"));
164 
165  char idbuf[11];// maxlength by cppcheck
166  sprintf(idbuf, "%2.2d", 0);// why this?
167  m_pid_merger = m_proc->Execute(merger, (char*)shmname.c_str(), idbuf, onsenhost, onsenport, mergerport);
168  sleep(2); // WHY a sleep? if it is a race condition find & destroy!
169 }
170 
171 void RoiSenderCallback::server()
172 {
173  while (true) {
174  pid_t pid = m_proc->CheckProcess();
175  // pid==0 -> nothing to report
176  // pid <0 -> some system error ... ignore ?
177  // pide>0 -> some pid has died (in one way or the other)
178  if (pid > 0) {
179  if (pid == m_pid_merger) {
180  if (getNode().getState() == RCState::LOADING_TS
181  || getNode().getState() == RCState::STARTING_TS || getNode().getState() == RCState::STOPPING_TS
182  || getNode().getState() == RCState::READY_S || getNode().getState() == RCState::RUNNING_S) {
183  // surpress Fatal and Error if we are in Abort? race condition on m_pid_merger
184  m_log->Fatal("RoiSenderCallback : merger2merge dead. pid = %d\n", pid);
185  setState(RCState::ERROR_ES);
186  } else {
187  m_log->Info("RoiSenderCallback : merger2merge dead. pid = %d\n", pid);
188  }
189  m_pid_merger = 0; // race condition for recover!!!
190  // if you have many childs, do a continue here to avoid having to await timout of CheckOutput (1s)
191  }
192  } else if (pid < 0) {
193  perror("RoiSenderCallback::server");
194  }
195  int st = m_proc->CheckOutput();
196  // st==0 -> timeout of select
197  // st <0 -> system error ... ?
198  // st >0 some file descriptor has action
199  if (st < 0) {
200  perror("RoiSenderCallback::server");// we will never see that in the logging
201  } else if (st > 0) {
202  m_log->ProcessLog(m_proc->GetFd());
203  }
204  }
205 }
void load(const DBObject &, const std::string &) override
overloaded functions from base class
Abstract base class for different kinds of events.