Belle II Software  release-08-01-10
RFEventProcessor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include "daq/rfarm/manager/RFEventProcessor.h"
10 
11 #include <sys/stat.h>
12 #include <sys/types.h>
13 #include <sys/wait.h>
14 #include <unistd.h>
15 
16 #include <cstring>
17 #include <iostream>
18 
19 #define RFOTSOUT stdout
20 
21 //#define DESY
22 
23 using namespace std;
24 using namespace Belle2;
25 
26 RFEventProcessor::RFEventProcessor(string conffile)
27 {
28  // 0. Initialize configuration manager
29  m_conf = new RFConf(conffile.c_str());
30  // char* nodename = m_conf->getconf ( "processor", "nodename" );
31  // char nodename[256];
32  strcpy(m_nodename, "evp_");
33 #ifndef DESY
34  gethostname(&m_nodename[4], sizeof(m_nodename) - 4);
35 #else
36  // Special treatment for DESY test nodes!!
37  char hostnamebuf[256];
38  gethostname(hostnamebuf, sizeof(hostnamebuf));
39  strcat(&m_nodename[4], &hostnamebuf[6]);
40  int lend = strlen(m_nodename);
41  m_nodename[lend + 1] = (char)0;
42  m_nodename[lend] = m_nodename[lend - 1];
43  strncpy(&m_nodename[lend - 1], "0", 1);
44  // End of DESY special treatment
45 #endif
46  printf("nodename = %s\n", m_nodename);
47 
48  // 1. Set execution directory
49  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/" + string(m_nodename);
50  printf("execdir = %s\n", execdir.c_str());
51 
52  mkdir(execdir.c_str(), 0755);
53  chdir(execdir.c_str());
54 
55  // 2. Initialize local shared memory
56  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
57  string(m_nodename);
58  m_shm = new RFSharedMem((char*)shmname.c_str());
59 
60  // 3. Initialize process manager
61  m_proc = new RFProcessManager(m_nodename);
62 
63  // 4. Initialize RingBuffers
64  // char* rbufin = m_conf->getconf("processor", "ringbufin");
65  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
66  string(m_conf->getconf("processor", "ringbufin"));
67  int rbinsize = m_conf->getconfi("processor", "ringbufinsize");
68  // m_rbufin = new RingBuffer(rbufin, rbinsize);
69  m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
70  // char* rbufout = m_conf->getconf("processor", "ringbufout");
71  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
72  string(m_conf->getconf("processor", "ringbufout"));
73  int rboutsize = m_conf->getconfi("processor", "ringbufoutsize");
74  // m_rbufout = new RingBuffer(rbufout, rboutsize);
75  m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
76 
77  // 5. Initialize LogManager
78  m_log = new RFLogManager(m_nodename, m_conf->getconf("system", "lognode"));
79 
80  // 6. Initialize data flow monitor
81  m_flow = new RFFlowStat((char*)shmname.c_str());
82 
83  // 7. Clear PID list
84  m_pid_sender = 0;
85  m_pid_basf2 = 0;
86  m_pid_receiver = 0;
87  m_pid_hrecv = 0;
88  m_pid_hrelay = 0;
89 
90 }
91 
92 RFEventProcessor::~RFEventProcessor()
93 {
94  delete m_log;
95  delete m_proc;
96  delete m_shm;
97  delete m_conf;
98  delete m_flow;
99  delete m_rbufin;
100  delete m_rbufout;
101 }
102 
103 
104 // Functions hooked up by NSM2
105 
106 int RFEventProcessor::Configure(NSMmsg* /*nsmm*/, NSMcontext* /*nsmc*/)
107 {
108  // Start processes from down stream
109 
110  // 0. Get common parameters
111  // char* rbufin = m_conf->getconf("processor", "ringbufin");
112  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
113  string(m_conf->getconf("processor", "ringbufin"));
114  // char* rbufout = m_conf->getconf("processor", "ringbufout");
115  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
116  string(m_conf->getconf("processor", "ringbufout"));
117 
118  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
119  string(m_nodename);
120 
121  // 1. Histogram Receiver
122  char* hrecv = m_conf->getconf("processor", "historecv", "script");
123  char* hport = m_conf->getconf("processor", "historecv", "port");
124  char* mapfile = m_conf->getconf("processor", "historecv", "mapfile");
125  m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
126 
127  sleep(5); // make sure that TMapFile is created.
128 
129  // 2. Histogram Relay
130  char* hrelay = m_conf->getconf("processor", "historelay", "script");
131  char* dqmdest = m_conf->getconf("dqmserver", "host");
132  char* dqmport = m_conf->getconf("dqmserver", "port");
133  char* interval = m_conf->getconf("processor", "historelay", "interval");
134  m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
135 
136  // 3. Run sender / logger
137  char* sender = m_conf->getconf("processor", "sender", "script");
138  char* port = m_conf->getconf("processor", "sender", "port");
139  m_pid_sender = m_proc->Execute(sender, (char*)rbufout.c_str(), port, (char*)shmname.c_str(), (char*)"1");
140  m_flow->clear(1);
141 
142  /*
143  // 4. Run basf2
144  char* basf2 = m_conf->getconf("processor", "basf2", "script");
145  if (nsmm->len > 0) {
146  basf2 = (char*) nsmm->datap;
147  printf("Configure: basf2 script overridden : %s\n", basf2);
148  }
149  m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
150  */
151 
152  // 5. Run receiver
153  char* receiver = m_conf->getconf("processor", "receiver", "script");
154  char* srchost = m_conf->getconf("distributor", "host");
155  // char* port = m_conf->getconf ( "distributor", "port" );
156  int portbase = m_conf->getconfi("distributor", "sender", "portbase");
157  // char* hostbase = m_conf->getconf("processor", "nodebase");
158  /* OLD impl
159  int baselen = strlen(hostbase);
160  char hostname[256];
161  gethostname(hostname, sizeof(hostname));
162  char id[3];
163  strcpy(id, &hostname[baselen + 1]);
164  int rport = atoi(id) + portbase;
165  */
166  int rport;
167  sscanf(&m_nodename[strlen(m_nodename) - 2], "%d", &rport);
168  rport += portbase;
169  char portchar[256];
170  sprintf(portchar, "%d", rport);
171  m_pid_receiver = m_proc->Execute(receiver, (char*)rbufin.c_str(), srchost, portchar, (char*)shmname.c_str(), (char*)"0");
172  m_flow->clear(0);
173 
174  printf("Configure : done\n");
175  fflush(stdout);
176 
177  // 6 Clear RingBuffers
178  m_rbufin->forceClear();
179  m_rbufout->forceClear();
180 
181  return 0;
182 
183 }
184 
185 int RFEventProcessor::UnConfigure(NSMmsg*, NSMcontext*)
186 {
187  // Simple implementation to stop all processes
188  // system("killall basf2 sock2rbr rb2sockr hrelay hserver");
189 
190  // Emergency stop
191  system("killall -9 python");
192 
193  // Normal abort
194  int status;
195  if (m_pid_sender != 0) {
196  printf("RFEventProcessor : killing sender pid=%d\n", m_pid_sender);
197  kill(m_pid_sender, SIGINT);
198  waitpid(m_pid_sender, &status, 0);
199  }
200  if (m_pid_basf2 != 0) {
201  printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
202  // kill(m_pid_basf2, SIGINT);
203  kill(m_pid_basf2, SIGINT);
204  waitpid(m_pid_basf2, &status, 0);
205  m_pid_basf2 = 0;
206  }
207  if (m_pid_receiver != 0) {
208  printf("RFEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
209  kill(m_pid_receiver, SIGINT);
210  waitpid(m_pid_receiver, &status, 0);
211  }
212  if (m_pid_hrecv != 0) {
213  printf("RFEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
214  kill(m_pid_hrecv, SIGINT);
215  waitpid(m_pid_hrecv, &status, 0);
216  }
217  if (m_pid_hrelay != 0) {
218  printf("RFEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
219  kill(m_pid_hrelay, SIGINT);
220  waitpid(m_pid_hrelay, &status, 0);
221  }
222 
223  // Clear RingBuffers
224  m_rbufin->forceClear();
225  m_rbufout->forceClear();
226 
227  // Clear PID list
228  m_flow->fillProcessStatus(GetNodeInfo());
229 
230  printf("Unconfigure : done\n");
231  fflush(stdout);
232  return 0;
233 }
234 
235 int RFEventProcessor::Start(NSMmsg* nsmm, NSMcontext* /*nsmc*/)
236 {
237  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
238  string(m_conf->getconf("processor", "ringbufin"));
239  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
240  string(m_conf->getconf("processor", "ringbufout"));
241  char* hport = m_conf->getconf("processor", "historecv", "port");
242 
243  // 0. Set run numbers
244  m_expno = nsmm->pars[0];
245  m_runno = nsmm->pars[1];
246 
247  // 1. Clear RingBuffers
248  m_rbufout->forceClear();
249  m_rbufin->forceClear();
250 
251  // 2. Run basf2
252  char* basf2 = m_conf->getconf("processor", "basf2", "script");
253  if (nsmm->len > 0) {
254  basf2 = (char*) nsmm->datap;
255  printf("Configure: basf2 script overridden : %s\n", basf2);
256  }
257  m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
258 
259  return 0;
260 }
261 
262 int RFEventProcessor::Stop(NSMmsg* /*msgm*/, NSMcontext* /*msgc*/)
263 {
264  printf("RFEventProcessor : STOP processing started\n");
265  // fflush ( stdout );
266  /*
267  char* hcollect = m_conf->getconf("processor", "dqm", "hcollect");
268  char* filename = m_conf->getconf("processor", "dqm", "file");
269  char* nprocs = m_conf->getconf("processor", "basf2", "nprocs");
270  int pid_hcollect = m_proc->Execute(hcollect, filename, nprocs);
271  int status;
272  waitpid(pid_hcollect, &status, 0);
273  */
274 
275  /*
276  // Need to wait until all the events are processed and sent
277  RfNodeInfo* nodeinfo = GetNodeInfo();
278  int ncount = 0;
279  for (;;) {
280  RfShm_Cell& cellin = m_flow->getinfo(0);
281  RfShm_Cell& cellout = m_flow->getinfo(1);
282  if (cellin.nqueue == 0 && cellout.nqueue == 0) break;
283  printf("inqueue = %d : outqueue = %d\n", cellin.nqueue, cellout.nqueue);
284  if (ncount > 30) {
285  printf("RFEventProcessor : Timeout (30sec) in stopping\n");
286  break;
287  }
288  sleep(1);
289  ncount ++;
290  }
291  */
292  // Checking number of processed events using RfShm_Cell does not work as expected and is
293  // equivalent to wait for 30 sec. Just replaced with 5 sec waiting.
294  sleep(5);
295 
296  int status;
297  if (m_pid_basf2 != 0) {
298  printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
299  kill(m_pid_basf2, SIGINT);
300  waitpid(m_pid_basf2, &status, 0);
301  m_pid_basf2 = 0;
302  char outfile[1024];
303  sprintf(outfile, "dqm_e%4.4dr%6.6d.root", m_expno, m_runno);
304  std::rename("histofile.root", outfile);
305  printf("output file name = %s\n", outfile);
306  fflush(stdout);
307 
308  }
309  printf("RFEventProcessor : STOP processing done\n");
310  return 0;
311 }
312 
313 
314 int RFEventProcessor::Restart(NSMmsg*, NSMcontext*)
315 {
316  printf("RFEventProcessor : Restarting!!!!!\n");
317  /* Original impl.
318  if (m_pid_sender != 0) {
319  printf("RFEventProcessor : killing sender pid=%d\n", m_pid_sender);
320  kill(m_pid_sender, SIGINT);
321  }
322  if (m_pid_basf2 != 0) {
323  printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
324  kill(m_pid_basf2, SIGINT);
325  }
326  if (m_pid_receiver != 0) {
327  printf("RFEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
328  kill(m_pid_receiver, SIGINT);
329  }
330  if (m_pid_hrecv != 0) {
331  printf("RFEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
332  kill(m_pid_receiver, SIGINT);
333  }
334  if (m_pid_hrelay != 0) {
335  printf("RFEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
336  kill(m_pid_receiver, SIGINT);
337  }
338  // Simple implementation to stop all processes
339  system("killall basf2 sock2rbr rb2sockr hrelay hserver");
340  fflush(stdout);
341  */
342  NSMmsg* nsmmsg = NULL;
343  NSMcontext* nsmcontext = NULL;
344  RFEventProcessor::UnConfigure(nsmmsg, nsmcontext);
345  sleep(2);
346  RFEventProcessor::Configure(nsmmsg, nsmcontext);
347  return 0;
348 }
349 
350 // Server function
351 
352 void RFEventProcessor::server()
353 {
354  // Clear PID list
355  m_flow->fillProcessStatus(GetNodeInfo());
356  // Start Loop
357  while (true) {
358  pid_t pid = m_proc->CheckProcess();
359  if (pid > 0) {
360  printf("RFEventProcessor : process dead pid=%d\n", pid);
361  if (pid == m_pid_sender) {
362  m_log->Fatal("RFEventProcessor : sender dead. pid=%d\n", m_pid_sender);
363  m_pid_sender = 0;
364  } else if (pid == m_pid_basf2) {
365  m_log->Fatal("RFEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
366  m_pid_basf2 = 0;
367  } else if (pid == m_pid_receiver) {
368  m_log->Fatal("RFEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
369  m_pid_receiver = 0;
370  } else if (pid == m_pid_hrecv) {
371  m_log->Fatal("RFEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
372  m_pid_hrecv = 0;
373  } else if (pid == m_pid_hrelay) {
374  m_log->Fatal("RFEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
375  m_pid_hrelay = 0;
376  }
377  }
378  int st = m_proc->CheckOutput();
379  if (st < 0) {
380  perror("RFEventProcessor::server");
381  // exit ( -1 );
382  } else if (st > 0) {
383  m_log->ProcessLog(m_proc->GetFd());
384  }
385  m_flow->fillNodeInfo(0, GetNodeInfo(), false);
386  m_flow->fillNodeInfo(1, GetNodeInfo(), true);
387  m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
388  m_pid_hrecv, m_pid_hrelay);
389  }
390 }
391 
392 void RFEventProcessor::cleanup()
393 {
394  printf("RFEventProcessor : cleaning up\n");
395  UnConfigure(NULL, NULL);
396  printf("RFEventProcessor: Done. Exitting\n");
397  exit(-1);
398 }
399 
400 
401 
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
Abstract base class for different kinds of events.
Definition: nsm2.h:224