Belle II Software  release-08-01-10
RFOutputServer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include "daq/rfarm/manager/RFOutputServer.h"
10 
11 #include <sys/stat.h>
12 #include <sys/types.h>
13 #include <sys/wait.h>
14 
15 #include <unistd.h>
16 
17 #include <csignal>
18 #include <cstring>
19 
20 #define RFOTSOUT stdout
21 
22 using namespace std;
23 using namespace Belle2;
24 
25 RFOutputServer::RFOutputServer(string conffile)
26 {
27  // 0. Initialize configuration manager
28  m_conf = new RFConf(conffile.c_str());
29  char* nodename = m_conf->getconf("collector", "nodename");
30  // char nodename[256];
31  // gethostname(nodename, sizeof(nodename));
32 
33  // 1. Set execution directory
34  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/collector";
35 
36  mkdir(execdir.c_str(), 0755);
37  chdir(execdir.c_str());
38 
39  // 2. Initialize local shared memory
40  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
41  string(nodename);
42  m_shm = new RFSharedMem((char*)shmname.c_str());
43 
44  // 3. Initialize RingBuffers
45  // char* rbufin = m_conf->getconf("collector", "ringbufin");
46  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
47  string(m_conf->getconf("collector", "ringbufin"));
48  int rbinsize = m_conf->getconfi("collector", "ringbufinsize");
49  m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
50  // char* rbufout = m_conf->getconf("collector", "ringbufout");
51  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
52  string(m_conf->getconf("collector", "ringbufout"));
53  int rboutsize = m_conf->getconfi("collector", "ringbufoutsize");
54  m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
55 
56  // 4. Initialize process manager
57  m_proc = new RFProcessManager(nodename);
58 
59  // 5. Initialize LogManager
60  m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
61 
62  // 6. Initialize data flow monitor
63  m_flow = new RFFlowStat((char*)shmname.c_str());
64 
65  // 7. Clear PID list
66  m_pid_sender = 0;
67  m_pid_basf2 = 0;
68  m_nnodes = m_conf->getconfi("processor", "nnodes");
69  for (int i = 0; i < m_nnodes; i++)
70  m_pid_receiver[i] = 0;
71 
72 }
73 
74 RFOutputServer::~RFOutputServer()
75 {
76  delete m_log;
77  delete m_proc;
78  delete m_shm;
79  delete m_conf;
80  delete m_rbufin;
81  delete m_rbufout;
82  delete m_flow;
83 }
84 
85 
86 // Functions hooked up by NSM2
87 
88 int RFOutputServer::Configure(NSMmsg* nsmm, NSMcontext* /*nsmc*/)
89 {
90  // Start processes from down stream
91 
92  // 0. Get common parameters
93  // char* rbufin = m_conf->getconf("collector", "ringbufin");
94  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
95  string(m_conf->getconf("collector", "ringbufin"));
96  // char* rbufout = m_conf->getconf("collector", "ringbufout");
97  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
98  string(m_conf->getconf("collector", "ringbufout"));
99 
100  // char* shmname = m_conf->getconf("collector", "nodename");
101  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
102  string(m_conf->getconf("collector", "nodename"));
103 
104  // 1. Histogram Receiver
105  // char* hrecv = m_conf->getconf("collector", "historecv", "script");
106  char* hport = m_conf->getconf("collector", "historecv", "port");
107  // char* mapfile = m_conf->getconf("collector", "historecv", "mapfile");
108  // m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
109 
110  // 2. Histogram Relay
111  // char* hrelay = m_conf->getconf("collector", "historelay", "script");
112  // char* dqmdest = m_conf->getconf("dqmserver", "host");
113  // char* dqmport = m_conf->getconf("dqmserver", "port");
114  // char* interval = m_conf->getconf("collector", "historelay", "interval");
115  // m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
116 
117  // 3. Run sender / logger
118  char* src = m_conf->getconf("collector", "destination");
119  if (strstr(src, "net") != 0) {
120  // Run sender
121  char* sender = m_conf->getconf("collector", "sender", "script");
122  char* port = m_conf->getconf("collector", "sender", "port");
123  char idbuf[3];
124  sprintf(idbuf, "%2.2d", RF_OUTPUT_ID);
125  m_pid_sender = m_proc->Execute(sender, (char*)rbufout.c_str(), port, (char*)shmname.c_str(), idbuf);
126  m_flow->clear(RF_OUTPUT_ID);
127  } else if (strstr(src, "file") != 0) {
128  // Run file writer
129  char* writer = m_conf->getconf("collector", "writer", "script");
130  char* file = m_conf->getconf("collector", "writer", "filename");
131  char* nnode = m_conf->getconf("processor", "nnodes");
132  m_pid_sender = m_proc->Execute(writer, (char*)rbufout.c_str(), file, nnode);
133  } else {
134  // Do not run anything
135  }
136 
137  // 4. Run basf2
138  char* basf2 = m_conf->getconf("collector", "basf2", "script");
139  if (nsmm->len > 0) {
140  basf2 = (char*) nsmm->datap;
141  printf("Configure: basf2 script overridden : %s\n", basf2);
142  }
143  m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
144 
145  // 5. Run receiver
146  m_nnodes = 0;
147  int maxnodes = m_conf->getconfi("processor", "nnodes");
148  int idbase = m_conf->getconfi("processor", "idbase");
149  // char* hostbase = m_conf->getconf("processor", "hostbase");
150  char* hostbase = m_conf->getconf("processor", "hostbase");
151  char* badlist = m_conf->getconf("processor", "badlist");
152  char* port = m_conf->getconf("processor", "sender", "port");
153 
154  char* receiver = m_conf->getconf("collector", "receiver", "script");
155  char hostname[512], idname[3], shmid[3];
156  for (int i = 0; i < maxnodes; i++) {
157  sprintf(idname, "%2.2d", idbase + i);
158  sprintf(shmid, "%2.2d", i);
159  if (badlist == NULL ||
160  strstr(badlist, idname) == 0) {
161  sprintf(hostname, "%s%2.2d", hostbase, idbase + i);
162  m_pid_receiver[m_nnodes] = m_proc->Execute(receiver, (char*)rbufin.c_str(), hostname, port, (char*)shmname.c_str(), shmid);
163  m_flow->clear(i);
164  m_nnodes++;
165  }
166  }
167 
168  m_rbufin->forceClear();
169  m_rbufout->forceClear();
170  return 0;
171 }
172 
173 int RFOutputServer::UnConfigure(NSMmsg*, NSMcontext*)
174 {
175  // system("killall sock2rbr rb2sockr basf2 hrelay hserver");
176 
177  printf("m_pid_sender = %d\n", m_pid_sender);
178  printf("m_pid_basf2 = %d\n", m_pid_basf2);
179  fflush(stdout);
180  int status, ws;
181  if (m_pid_sender != 0) {
182  printf("killing sender %d\n", m_pid_sender);
183  // kill(m_pid_sender, SIGINT);
184  kill(m_pid_sender, SIGINT);
185  ws = waitpid(m_pid_sender, &status, 0);
186  printf("wait return = %d, status = %d\n", ws, status);
187  }
188 
189  if (m_pid_basf2 != 0) {
190  printf("killing sender %d\n", m_pid_sender);
191  kill(m_pid_basf2, SIGINT);
192  ws = waitpid(m_pid_basf2, &status, 0);
193  printf("wait return = %d, status = %d\n", ws, status);
194  }
195 
196  for (int i = 0; i < m_nnodes; i++) {
197  if (m_pid_receiver[i] != 0) {
198  printf("killing receiver %d\n", m_pid_receiver[i]);
199  kill(m_pid_receiver[i], SIGINT);
200  ws = waitpid(m_pid_receiver[i], &status, 0);
201  printf("wait return = %d, status = %d\n", ws, status);
202  }
203  }
204 
205  // Clear RingBuffer
206  m_rbufin->forceClear();
207  m_rbufout->forceClear();
208 
209  // Clear process list
210  m_flow->fillProcessStatus(GetNodeInfo());
211 
212  printf("Unconfigure done\n");
213  fflush(stdout);
214  return 0;
215 }
216 
217 int RFOutputServer::Start(NSMmsg*, NSMcontext*)
218 {
219  // Clear RingBuffer
220  m_rbufout->forceClear();
221  // m_rbufin->forceClear();
222  return 0;
223 }
224 
225 int RFOutputServer::Stop(NSMmsg*, NSMcontext*)
226 {
227  return 0;
228 }
229 
230 
231 int RFOutputServer::Restart(NSMmsg*, NSMcontext*)
232 {
233  printf("RFOutputServer : Restarting!!!!!!\n");
234  /* Original Impl
235  kill(m_pid_sender, SIGINT);
236  kill(m_pid_basf2, SIGINT);
237  for (int i = 0; i < m_nnodes; i++) {
238  kill(m_pid_receiver[i], SIGINT);
239  }
240  // Simple Implementation
241  system("killall sock2rbr rb2sockr basf2 hrelay hserver");
242  fflush(stdout);
243  */
244  NSMmsg* nsmmsg = NULL;
245  NSMcontext* nsmcontext = NULL;
246  RFOutputServer::UnConfigure(nsmmsg, nsmcontext);
247  sleep(2);
248  RFOutputServer::Configure(nsmmsg, nsmcontext);
249  return 0;
250 }
251 
252 // Server function
253 
254 void RFOutputServer::server()
255 {
256  m_flow->fillProcessStatus(GetNodeInfo());
257  while (true) {
258  int recv_id = 0;
259  pid_t pid = m_proc->CheckProcess();
260  if (pid > 0) {
261  printf("RFOutputServer : process dead. pid=%d\n", pid);
262  if (pid == m_pid_sender) {
263  m_log->Fatal("RFOutputServer : sender process dead. pid=%d\n", pid);
264  m_pid_sender = 0;
265  } else if (pid == m_pid_basf2) {
266  m_log->Fatal("RFOutputServer : basf2 process dead. pid=%d\n", pid);
267  m_pid_basf2 = 0;
268  } else {
269  for (int i = 0; i < m_nnodes; i++) {
270  if (pid == m_pid_receiver[i]) {
271  m_log->Fatal("RFOutputServer : receiver process %d dead. pid=%d\n", i, pid);
272  m_pid_receiver[i] = 0;
273  recv_id = i;
274  }
275  }
276  }
277  }
278 
279  int st = m_proc->CheckOutput();
280  if (st < 0) {
281  perror("RFOutputServer::server");
282  // exit ( -1 );
283  } else if (st > 0) {
284  m_log->ProcessLog(m_proc->GetFd());
285  }
286  m_flow->fillNodeInfo(RF_OUTPUT_ID, GetNodeInfo(), true);
287  m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver[recv_id], m_pid_sender,
288  m_pid_basf2);
289  }
290 }
291 void RFOutputServer::cleanup()
292 {
293  printf("RFOutputServer : cleaning up\n");
294  UnConfigure(NULL, NULL);
295  printf("RFOutputServer: Done. Exitting\n");
296  exit(-1);
297 }
298 
299 
300 
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
Abstract base class for different kinds of events.
Definition: nsm2.h:224