Belle II Software  release-06-00-14
RoiSenderCallback.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/roisend/RoiSenderCallback.h>
10 
11 #include <sys/stat.h>
12 #include <sys/types.h>
13 #include <unistd.h>
14 
15 #include <csignal>
16 
17 using namespace Belle2;
18 using namespace std;
19 
20 static RoiSenderCallback* s_roisender = NULL;
21 
22 //-----------------------------------------------------------------
23 // Rbuf-Read Thread Interface
24 //-----------------------------------------------------------------
25 void* RunRoiSenderLogger(void*)
26 {
27  s_roisender->server();
28  return NULL;
29 }
30 
31 
32 
33 
34 RoiSenderCallback::RoiSenderCallback()
35 {
36  // Conf file
37  m_conf = new RFConf(getenv("ROISENDER_CONFFILE"));
38 
39  s_roisender = this;
40 }
41 
42 
43 void RoiSenderCallback::load(const DBObject&, const std::string&)
44 {
45  // 1. Set execution directory
46  // char* chr_execdir = m_conf->getconf("expreco","execdir_base");
47  // printf ( "execdir = %s\n", chr_execdir );
48  char* nodename = m_conf->getconf("roisender", "nodename");
49 
50  string execdir = string(m_conf->getconf("roisender", "execdir"));
51  printf("execdir = %s\n", execdir.c_str());
52 
53  mkdir(execdir.c_str(), 0755);
54  chdir(execdir.c_str());
55 
56  // 2. Initialize process manager if not existing
57  if (!m_proc) m_proc = new RFProcessManager(nodename);
58 
59  // 3. Initialize log manager if not existing
60  if (!m_log) m_log = new RFLogManager(nodename, nodename);
61 
62  // 4. Initialize local shared memory if not existing, this seems to be used by flow monitoring in mergemerge?
63  if (!m_shm) m_shm = new RFSharedMem(nodename);
64 
65 
66  // 5. Run MergerMerge
67  char* merger = m_conf->getconf("roisender", "merger");
68  char* mergerport = m_conf->getconf("roisender", "mergerport");
69 // char* mergerhost = m_conf->getconf("roisender", "mergerhost");
70  char* onsenhost = m_conf->getconf("roisender", "onsenhost");
71  char* onsenport = m_conf->getconf("roisender", "onsenport");
72 
73  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
74  string(m_conf->getconf("roisender", "nodename"));
75 
76  char idbuf[11]; // maximum length by cppcheck
77  sprintf(idbuf, "%2.2d", 0);
78  m_pid_merger = m_proc->Execute(merger, (char*)shmname.c_str(), idbuf, onsenhost, onsenport, mergerport);
79  sleep(2); // WHY a sleep? if it is a race condition find & destroy!
80 
81  // 6. Start Logger -- only on first load
82  if (!m_logthread) {
83  pthread_attr_t thread_attr;
84  pthread_attr_init(&thread_attr);
85  // pthread_attr_setschedpolicy(&thread_attr , SCHED_FIFO);
86  // pthread_attr_setdetachstate(&thread_attr , PTHREAD_CREATE_DETACHED);
87  // pthread_t thr_input;
88  pthread_create(&m_logthread, NULL, RunRoiSenderLogger, NULL);
89  }
90 }
91 
92 void RoiSenderCallback::start(int /*expno*/, int /*runno*/)
93 {
94  if (m_pid_merger != 0) {
95  int pid = m_pid_merger;
96  kill(pid, SIGUSR1);
97  LogFile::info("Send SIGUSR1 to (pid=%d)", pid);// attention, race condition!
98  }
99 }
100 
101 void RoiSenderCallback::stop(void)
102 {
103  if (m_pid_merger != 0) {
104  int pid = m_pid_merger;
105  kill(pid, SIGUSR2);
106  LogFile::info("Send SIGUSR2 to (pid=%d)", pid);// attention, race condition!
107  }
108 }
109 
110 void RoiSenderCallback::abort(void)
111 {
112  // Kill processes
113  if (m_pid_merger != 0) {
114  int pid = m_pid_merger;
115  kill(pid, SIGINT);
116  LogFile::info("kill merger (pid=%d) with SIGINT ", pid);// attention, race condition!
117  }
118  // wait until
119  for (int i = 0; m_pid_merger; i++) {
120  sleep(1);
121  int pid = m_pid_merger;
122  if (i == 5) {
123  kill(pid, SIGKILL); // force!
124  LogFile::warning("kill merger (pid=%d) with SIGKILL", pid);
125  }
126  if (i == 10) {
127  LogFile::error("killing merger (pid=%d) did not work", pid);
128  m_pid_merger = 0;
129  break;
130  }
131  }
132 }
133 
134 void RoiSenderCallback::recover(const DBObject&, const std::string&)
135 {
136  // Kill processes
137  abort();
138 
139  sleep(2); // WHY a sleep? if it is a race condition find & destroy!
140 
141  // 1. Run merger first
142  char* merger = m_conf->getconf("roisender", "merger");
143  char* mergerport = m_conf->getconf("roisender", "mergerport");
144 // char* mergerhost = m_conf->getconf("roisender", "mergerhost");
145  char* onsenhost = m_conf->getconf("roisender", "onsenhost");
146  char* onsenport = m_conf->getconf("roisender", "onsenport");
147 
148  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
149  string(m_conf->getconf("roisender", "nodename"));
150 
151  char idbuf[11];// maxlength by cppcheck
152  sprintf(idbuf, "%2.2d", 0);// why this?
153  m_pid_merger = m_proc->Execute(merger, (char*)shmname.c_str(), idbuf, onsenhost, onsenport, mergerport);
154  sleep(2); // WHY a sleep? if it is a race condition find & destroy!
155 }
156 
157 void RoiSenderCallback::server()
158 {
159  while (true) {
160  pid_t pid = m_proc->CheckProcess();
161  // pid==0 -> nothing to report
162  // pid <0 -> some system error ... ignore ?
163  // pide>0 -> some pid has died (in one way or the other)
164  if (pid > 0) {
165  if (pid == m_pid_merger) {
166  if (getNode().getState() == RCState::LOADING_TS
167  || getNode().getState() == RCState::STARTING_TS || getNode().getState() == RCState::STOPPING_TS
168  || getNode().getState() == RCState::READY_S || getNode().getState() == RCState::RUNNING_S) {
169  // surpress Fatal and Error if we are in Abort? race condition on m_pid_merger
170  m_log->Fatal("RoiSenderCallback : merger2merge dead. pid = %d\n", pid);
171  setState(RCState::ERROR_ES);
172  } else {
173  m_log->Info("RoiSenderCallback : merger2merge dead. pid = %d\n", pid);
174  }
175  m_pid_merger = 0; // race condition for recover!!!
176  // if you have many childs, do a continue here to avoid having to await timout of CheckOutput (1s)
177  }
178  } else if (pid < 0) {
179  perror("RoiSenderCallback::server");
180  }
181  int st = m_proc->CheckOutput();
182  // st==0 -> timeout of select
183  // st <0 -> system error ... ?
184  // st >0 some file descriptor has action
185  if (st < 0) {
186  perror("RoiSenderCallback::server");// we will never see that in the logging
187  } else if (st > 0) {
188  m_log->ProcessLog(m_proc->GetFd());
189  }
190  }
191 }
void load(const DBObject &, const std::string &) override
overloaded functions from base class
Abstract base class for different kinds of events.