Belle II Software  release-05-01-25
RoiSenderCallback.cc
1 
2 #include <daq/roisend/RoiSenderCallback.h>
3 
4 #include <sys/stat.h>
5 #include <sys/types.h>
6 #include <unistd.h>
7 
8 #include <csignal>
9 
10 using namespace Belle2;
11 using namespace std;
12 
13 static RoiSenderCallback* s_roisender = NULL;
14 
15 //-----------------------------------------------------------------
16 // Rbuf-Read Thread Interface
17 //-----------------------------------------------------------------
18 void* RunRoiSenderLogger(void*)
19 {
20  s_roisender->server();
21  return NULL;
22 }
23 
24 
25 
26 
27 RoiSenderCallback::RoiSenderCallback()
28 {
29  // Conf file
30  m_conf = new RFConf(getenv("ROISENDER_CONFFILE"));
31 
32  s_roisender = this;
33 }
34 
35 
36 void RoiSenderCallback::load(const DBObject&, const std::string&)
37 {
38  // 1. Set execution directory
39  // char* chr_execdir = m_conf->getconf("expreco","execdir_base");
40  // printf ( "execdir = %s\n", chr_execdir );
41  char* nodename = m_conf->getconf("roisender", "nodename");
42 
43  string execdir = string(m_conf->getconf("roisender", "execdir"));
44  printf("execdir = %s\n", execdir.c_str());
45 
46  mkdir(execdir.c_str(), 0755);
47  chdir(execdir.c_str());
48 
49  // 2. Initialize process manager if not existing
50  if (!m_proc) m_proc = new RFProcessManager(nodename);
51 
52  // 3. Initialize log manager if not existing
53  if (!m_log) m_log = new RFLogManager(nodename, nodename);
54 
55  // 4. Initialize local shared memory if not existing, this seems to be used by flow monitoring in mergemerge?
56  if (!m_shm) m_shm = new RFSharedMem(nodename);
57 
58 
59  // 5. Run MergerMerge
60  char* merger = m_conf->getconf("roisender", "merger");
61  char* mergerport = m_conf->getconf("roisender", "mergerport");
62 // char* mergerhost = m_conf->getconf("roisender", "mergerhost");
63  char* onsenhost = m_conf->getconf("roisender", "onsenhost");
64  char* onsenport = m_conf->getconf("roisender", "onsenport");
65 
66  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
67  string(m_conf->getconf("roisender", "nodename"));
68 
69  char idbuf[11]; // maximum length by cppcheck
70  sprintf(idbuf, "%2.2d", 0);
71  m_pid_merger = m_proc->Execute(merger, (char*)shmname.c_str(), idbuf, onsenhost, onsenport, mergerport);
72  sleep(2); // WHY a sleep? if it is a race condition find & destroy!
73 
74  // 6. Start Logger -- only on first load
75  if (!m_logthread) {
76  pthread_attr_t thread_attr;
77  pthread_attr_init(&thread_attr);
78  // pthread_attr_setschedpolicy(&thread_attr , SCHED_FIFO);
79  // pthread_attr_setdetachstate(&thread_attr , PTHREAD_CREATE_DETACHED);
80  // pthread_t thr_input;
81  pthread_create(&m_logthread, NULL, RunRoiSenderLogger, NULL);
82  }
83 }
84 
85 void RoiSenderCallback::start(int /*expno*/, int /*runno*/)
86 {
87  if (m_pid_merger != 0) {
88  int pid = m_pid_merger;
89  kill(pid, SIGUSR1);
90  LogFile::info("Send SIGUSR1 to (pid=%d)", pid);// attention, race condition!
91  }
92 }
93 
94 void RoiSenderCallback::stop(void)
95 {
96  if (m_pid_merger != 0) {
97  int pid = m_pid_merger;
98  kill(pid, SIGUSR2);
99  LogFile::info("Send SIGUSR2 to (pid=%d)", pid);// attention, race condition!
100  }
101 }
102 
103 void RoiSenderCallback::abort(void)
104 {
105  // Kill processes
106  if (m_pid_merger != 0) {
107  int pid = m_pid_merger;
108  kill(pid, SIGINT);
109  LogFile::info("kill merger (pid=%d) with SIGINT ", pid);// attention, race condition!
110  }
111  // wait until
112  for (int i = 0; m_pid_merger; i++) {
113  sleep(1);
114  int pid = m_pid_merger;
115  if (i == 5) {
116  kill(pid, SIGKILL); // force!
117  LogFile::warning("kill merger (pid=%d) with SIGKILL", pid);
118  }
119  if (i == 10) {
120  LogFile::error("killing merger (pid=%d) did not work", pid);
121  m_pid_merger = 0;
122  break;
123  }
124  }
125 }
126 
127 void RoiSenderCallback::recover(const DBObject&, const std::string&)
128 {
129  // Kill processes
130  abort();
131 
132  sleep(2); // WHY a sleep? if it is a race condition find & destroy!
133 
134  // 1. Run merger first
135  char* merger = m_conf->getconf("roisender", "merger");
136  char* mergerport = m_conf->getconf("roisender", "mergerport");
137 // char* mergerhost = m_conf->getconf("roisender", "mergerhost");
138  char* onsenhost = m_conf->getconf("roisender", "onsenhost");
139  char* onsenport = m_conf->getconf("roisender", "onsenport");
140 
141  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
142  string(m_conf->getconf("roisender", "nodename"));
143 
144  char idbuf[11];// maxlength by cppcheck
145  sprintf(idbuf, "%2.2d", 0);// why this?
146  m_pid_merger = m_proc->Execute(merger, (char*)shmname.c_str(), idbuf, onsenhost, onsenport, mergerport);
147  sleep(2); // WHY a sleep? if it is a race condition find & destroy!
148 }
149 
150 void RoiSenderCallback::server()
151 {
152  while (true) {
153  pid_t pid = m_proc->CheckProcess();
154  // pid==0 -> nothing to report
155  // pid <0 -> some system error ... ignore ?
156  // pide>0 -> some pid has died (in one way or the other)
157  if (pid > 0) {
158  if (pid == m_pid_merger) {
159  if (getNode().getState() == RCState::LOADING_TS
160  || getNode().getState() == RCState::STARTING_TS || getNode().getState() == RCState::STOPPING_TS
161  || getNode().getState() == RCState::READY_S || getNode().getState() == RCState::RUNNING_S) {
162  // surpress Fatal and Error if we are in Abort? race condition on m_pid_merger
163  m_log->Fatal("RoiSenderCallback : merger2merge dead. pid = %d\n", pid);
164  setState(RCState::ERROR_ES);
165  } else {
166  m_log->Info("RoiSenderCallback : merger2merge dead. pid = %d\n", pid);
167  }
168  m_pid_merger = 0; // race condition for recover!!!
169  // if you have many childs, do a continue here to avoid having to await timout of CheckOutput (1s)
170  }
171  } else if (pid < 0) {
172  perror("RoiSenderCallback::server");
173  }
174  int st = m_proc->CheckOutput();
175  // st==0 -> timeout of select
176  // st <0 -> system error ... ?
177  // st >0 some file descriptor has action
178  if (st < 0) {
179  perror("RoiSenderCallback::server");// we will never see that in the logging
180  } else if (st > 0) {
181  m_log->ProcessLog(m_proc->GetFd());
182  }
183  }
184 }
Belle2::DBObject
Definition: DBObject.h:14
Belle2::RFSharedMem
Definition: RFSharedMem.h:51
Belle2::RFLogManager
Definition: RFLogManager.h:18
Belle2::RFProcessManager
Definition: RFProcessManager.h:22
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::RoiSenderCallback
Definition: RoiSenderCallback.h:17
Belle2::RoiSenderCallback::load
void load(const DBObject &, const std::string &) override
overloaded functions from base class
Definition: RoiSenderCallback.cc:36
Belle2::RFConf
Definition: RFConf.h:24