Belle II Software development
RoiSenderCallback.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <daq/roisend/RoiSenderCallback.h>
10
11#include <sys/stat.h>
12#include <sys/types.h>
13#include <unistd.h>
14
15#include <csignal>
16
17using namespace Belle2;
18using namespace std;
19
20static RoiSenderCallback* s_roisender = NULL;
21
22//-----------------------------------------------------------------
23// Rbuf-Read Thread Interface
24//-----------------------------------------------------------------
25void* RunRoiSenderLogger(void*)
26{
27 s_roisender->server();
28 return NULL;
29}
30
31
32
33
34RoiSenderCallback::RoiSenderCallback()
35{
36 // Conf file
37 m_conf = new RFConf(getenv("ROISENDER_CONFFILE"));
38
39 s_roisender = this;
40}
41
42
43void RoiSenderCallback::load(const DBObject&, const std::string&)
44{
45 // 1. Set execution directory
46 // char* chr_execdir = m_conf->getconf("expreco","execdir_base");
47 // printf ( "execdir = %s\n", chr_execdir );
48 char* nodename = m_conf->getconf("roisender", "nodename");
49
50 string execdir = string(m_conf->getconf("roisender", "execdir"));
51 printf("execdir = %s\n", execdir.c_str());
52
53 mkdir(execdir.c_str(), 0755);
54 chdir(execdir.c_str());
55
56 // 2. Initialize process manager if not existing
57 if (!m_proc) m_proc = new RFProcessManager(nodename);
58
59 // 3. Initialize log manager if not existing
60 if (!m_log) m_log = new RFLogManager(nodename, nodename);
61
62 // 4. Initialize local shared memory if not existing, this seems to be used by flow monitoring in mergemerge?
63 if (!m_shm) m_shm = new RFSharedMem(nodename);
64
65
66 // 5. Run MergerMerge
67 char* merger = m_conf->getconf("roisender", "merger");
68 char* mergerport = m_conf->getconf("roisender", "mergerport");
69// char* mergerhost = m_conf->getconf("roisender", "mergerhost");
70 char* onsenhost = m_conf->getconf("roisender", "onsenhost");
71 char* onsenport = m_conf->getconf("roisender", "onsenport");
72
73 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
74 string(m_conf->getconf("roisender", "nodename"));
75
76 char idbuf[11]; // maximum length by cppcheck
77 sprintf(idbuf, "%2.2d", 0);
78
79 // Execute merger_merge
80 m_pid_merger = m_proc->Execute(merger, (char*)shmname.c_str(), idbuf, onsenhost, onsenport, mergerport);
81 if (m_pid_merger <= 0) {
82 LogFile::error("loading %s %s %s %s %s %s failed!", merger, (char*)shmname.c_str(), idbuf, onsenhost, onsenport, mergerport);
83 setState(RCState::ERROR_ES);
84 return;
85 }
86
87 sleep(2); // WHY a sleep? if it is a race condition find & destroy!
88
89 // 6. Start Logger -- only on first load
90 if (!m_logthread) {
91 pthread_attr_t thread_attr;
92 pthread_attr_init(&thread_attr);
93 // pthread_attr_setschedpolicy(&thread_attr , SCHED_FIFO);
94 // pthread_attr_setdetachstate(&thread_attr , PTHREAD_CREATE_DETACHED);
95 // pthread_t thr_input;
96 pthread_create(&m_logthread, NULL, RunRoiSenderLogger, NULL);
97 }
98}
99
100void RoiSenderCallback::start(int /*expno*/, int /*runno*/)
101{
102 if (m_pid_merger > 0) {
103 int pid = m_pid_merger;
104 kill(pid, SIGUSR1);
105 LogFile::info("Send SIGUSR1 to (pid=%d)", pid);// attention, race condition!
106 } else {
107 LogFile::error("No merger_merge in start!");
108 setState(RCState::ERROR_ES);
109 }
110}
111
112void RoiSenderCallback::stop(void)
113{
114 if (m_pid_merger > 0) {
115 int pid = m_pid_merger;
116 kill(pid, SIGUSR2);
117 LogFile::info("Send SIGUSR2 to (pid=%d)", pid);// attention, race condition!
118 } else {
119 LogFile::error("No merger_merge in stop!");
120 setState(RCState::ERROR_ES);
121 }
122}
123
124void RoiSenderCallback::abort(void)
125{
126 // Kill processes
127 if (m_pid_merger > 0) {
128 int pid = m_pid_merger;
129 kill(pid, SIGINT);
130 LogFile::info("kill merger (pid=%d) with SIGINT ", pid);// attention, race condition!
131 }
132 // wait until
133 for (int i = 0; m_pid_merger; i++) {
134 sleep(1);
135 int pid = m_pid_merger;
136 if (i == 5 && m_pid_merger > 0) {
137 kill(pid, SIGKILL); // force!
138 LogFile::warning("kill merger (pid=%d) with SIGKILL", pid);
139 }
140 if (i == 10) {
141 LogFile::error("killing merger (pid=%d) did not work", pid);
142 m_pid_merger = 0;
143 break;
144 }
145 }
146}
147
148void RoiSenderCallback::recover(const DBObject&, const std::string&)
149{
150 // Kill processes
151 abort();
152
153 sleep(2); // WHY a sleep? if it is a race condition find & destroy!
154
155 // 1. Run merger first
156 char* merger = m_conf->getconf("roisender", "merger");
157 char* mergerport = m_conf->getconf("roisender", "mergerport");
158// char* mergerhost = m_conf->getconf("roisender", "mergerhost");
159 char* onsenhost = m_conf->getconf("roisender", "onsenhost");
160 char* onsenport = m_conf->getconf("roisender", "onsenport");
161
162 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
163 string(m_conf->getconf("roisender", "nodename"));
164
165 char idbuf[11];// maxlength by cppcheck
166 sprintf(idbuf, "%2.2d", 0);// why this?
167 m_pid_merger = m_proc->Execute(merger, (char*)shmname.c_str(), idbuf, onsenhost, onsenport, mergerport);
168 sleep(2); // WHY a sleep? if it is a race condition find & destroy!
169}
170
171void RoiSenderCallback::server()
172{
173 while (true) {
174 pid_t pid = m_proc->CheckProcess();
175 // pid==0 -> nothing to report
176 // pid <0 -> some system error ... ignore ?
177 // pide>0 -> some pid has died (in one way or the other)
178 if (pid > 0) {
179 if (pid == m_pid_merger) {
180 if (getNode().getState() == RCState::LOADING_TS
181 || getNode().getState() == RCState::STARTING_TS || getNode().getState() == RCState::STOPPING_TS
182 || getNode().getState() == RCState::READY_S || getNode().getState() == RCState::RUNNING_S) {
183 // surpress Fatal and Error if we are in Abort? race condition on m_pid_merger
184 m_log->Fatal("RoiSenderCallback : merger2merge dead. pid = %d\n", pid);
185 setState(RCState::ERROR_ES);
186 } else {
187 m_log->Info("RoiSenderCallback : merger2merge dead. pid = %d\n", pid);
188 }
189 m_pid_merger = 0; // race condition for recover!!!
190 // if you have many childs, do a continue here to avoid having to await timout of CheckOutput (1s)
191 }
192 } else if (pid < 0) {
193 perror("RoiSenderCallback::server");
194 }
195 int st = m_proc->CheckOutput();
196 // st==0 -> timeout of select
197 // st <0 -> system error ... ?
198 // st >0 some file descriptor has action
199 if (st < 0) {
200 perror("RoiSenderCallback::server");// we will never see that in the logging
201 } else if (st > 0) {
202 m_log->ProcessLog(m_proc->GetFd());
203 }
204 }
205}
int Execute(char *script, int nargs, char **args)
void load(const DBObject &, const std::string &) override
overloaded functions from base class
Abstract base class for different kinds of events.
STL namespace.