Belle II Software  release-05-01-25
RFEventProcessor.cc
1 //+
2 // File : RFEventProcessor.cc
3 // Description : Collect outputs from worker node and send them to EVB2
4 // w/ branch to PXD
5 //
6 // Author : Ryosuke Itoh, IPNS, KEK
7 // Date : 24 - June - 2013
8 //-
9 
10 #include "daq/rfarm/manager/RFEventProcessor.h"
11 
12 #include <sys/stat.h>
13 #include <sys/types.h>
14 #include <sys/wait.h>
15 #include <unistd.h>
16 
17 #include <cstring>
18 #include <iostream>
19 
20 #define RFOTSOUT stdout
21 
22 //#define DESY
23 
24 using namespace std;
25 using namespace Belle2;
26 
27 RFEventProcessor::RFEventProcessor(string conffile)
28 {
29  // 0. Initialize configuration manager
30  m_conf = new RFConf(conffile.c_str());
31  // char* nodename = m_conf->getconf ( "processor", "nodename" );
32  // char nodename[256];
33  strcpy(m_nodename, "evp_");
34 #ifndef DESY
35  gethostname(&m_nodename[4], sizeof(m_nodename));
36 #else
37  // Special treatment for DESY test nodes!!
38  char hostnamebuf[256];
39  gethostname(hostnamebuf, sizeof(hostnamebuf));
40  strcat(&m_nodename[4], &hostnamebuf[6]);
41  int lend = strlen(m_nodename);
42  m_nodename[lend + 1] = (char)0;
43  m_nodename[lend] = m_nodename[lend - 1];
44  strncpy(&m_nodename[lend - 1], "0", 1);
45  // End of DESY special treatment
46 #endif
47  printf("nodename = %s\n", m_nodename);
48 
49  // 1. Set execution directory
50  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/" + string(m_nodename);
51  printf("execdir = %s\n", execdir.c_str());
52 
53  mkdir(execdir.c_str(), 0755);
54  chdir(execdir.c_str());
55 
56  // 2. Initialize local shared memory
57  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
58  string(m_nodename);
59  m_shm = new RFSharedMem((char*)shmname.c_str());
60 
61  // 3. Initialize process manager
62  m_proc = new RFProcessManager(m_nodename);
63 
64  // 4. Initialize RingBuffers
65  // char* rbufin = m_conf->getconf("processor", "ringbufin");
66  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
67  string(m_conf->getconf("processor", "ringbufin"));
68  int rbinsize = m_conf->getconfi("processor", "ringbufinsize");
69  // m_rbufin = new RingBuffer(rbufin, rbinsize);
70  m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
71  // char* rbufout = m_conf->getconf("processor", "ringbufout");
72  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
73  string(m_conf->getconf("processor", "ringbufout"));
74  int rboutsize = m_conf->getconfi("processor", "ringbufoutsize");
75  // m_rbufout = new RingBuffer(rbufout, rboutsize);
76  m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
77 
78  // 5. Initialize LogManager
79  m_log = new RFLogManager(m_nodename, m_conf->getconf("system", "lognode"));
80 
81  // 6. Initialize data flow monitor
82  m_flow = new RFFlowStat((char*)shmname.c_str());
83 
84  // 7. Clear PID list
85  m_pid_sender = 0;
86  m_pid_basf2 = 0;
87  m_pid_receiver = 0;
88  m_pid_hrecv = 0;
89  m_pid_hrelay = 0;
90 
91 }
92 
93 RFEventProcessor::~RFEventProcessor()
94 {
95  delete m_log;
96  delete m_proc;
97  delete m_shm;
98  delete m_conf;
99  delete m_flow;
100  delete m_rbufin;
101  delete m_rbufout;
102 }
103 
104 
105 // Functions hooked up by NSM2
106 
107 int RFEventProcessor::Configure(NSMmsg* nsmm, NSMcontext* nsmc)
108 {
109  // Start processes from down stream
110 
111  // 0. Get common parameters
112  // char* rbufin = m_conf->getconf("processor", "ringbufin");
113  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
114  string(m_conf->getconf("processor", "ringbufin"));
115  // char* rbufout = m_conf->getconf("processor", "ringbufout");
116  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
117  string(m_conf->getconf("processor", "ringbufout"));
118 
119  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
120  string(m_nodename);
121 
122  // 1. Histogram Receiver
123  char* hrecv = m_conf->getconf("processor", "historecv", "script");
124  char* hport = m_conf->getconf("processor", "historecv", "port");
125  char* mapfile = m_conf->getconf("processor", "historecv", "mapfile");
126  m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
127 
128  sleep(5); // make sure that TMapFile is created.
129 
130  // 2. Histogram Relay
131  char* hrelay = m_conf->getconf("processor", "historelay", "script");
132  char* dqmdest = m_conf->getconf("dqmserver", "host");
133  char* dqmport = m_conf->getconf("dqmserver", "port");
134  char* interval = m_conf->getconf("processor", "historelay", "interval");
135  m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
136 
137  // 3. Run sender / logger
138  char* sender = m_conf->getconf("processor", "sender", "script");
139  char* port = m_conf->getconf("processor", "sender", "port");
140  m_pid_sender = m_proc->Execute(sender, (char*)rbufout.c_str(), port, (char*)shmname.c_str(), (char*)"1");
141  m_flow->clear(1);
142 
143  /*
144  // 4. Run basf2
145  char* basf2 = m_conf->getconf("processor", "basf2", "script");
146  if (nsmm->len > 0) {
147  basf2 = (char*) nsmm->datap;
148  printf("Configure: basf2 script overridden : %s\n", basf2);
149  }
150  m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
151  */
152 
153  // 5. Run receiver
154  char* receiver = m_conf->getconf("processor", "receiver", "script");
155  char* srchost = m_conf->getconf("distributor", "host");
156  // char* port = m_conf->getconf ( "distributor", "port" );
157  int portbase = m_conf->getconfi("distributor", "sender", "portbase");
158  char* hostbase = m_conf->getconf("processor", "nodebase");
159  int baselen = strlen(hostbase);
160  /* OLD impl
161  char hostname[256];
162  gethostname(hostname, sizeof(hostname));
163  char id[3];
164  strcpy(id, &hostname[baselen + 1]);
165  int rport = atoi(id) + portbase;
166  */
167  int rport;
168  sscanf(&m_nodename[strlen(m_nodename) - 2], "%d", &rport);
169  rport += portbase;
170  char portchar[256];
171  sprintf(portchar, "%d", rport);
172  m_pid_receiver = m_proc->Execute(receiver, (char*)rbufin.c_str(), srchost, portchar, (char*)shmname.c_str(), (char*)"0");
173  m_flow->clear(0);
174 
175  printf("Configure : done\n");
176  fflush(stdout);
177 
178  // 6 Clear RingBuffers
179  m_rbufin->forceClear();
180  m_rbufout->forceClear();
181 
182  return 0;
183 
184 }
185 
186 int RFEventProcessor::UnConfigure(NSMmsg*, NSMcontext*)
187 {
188  // Simple implementation to stop all processes
189  // system("killall basf2 sock2rbr rb2sockr hrelay hserver");
190 
191  // Emergency stop
192  system("killall -9 python");
193 
194  // Normal abort
195  int status;
196  if (m_pid_sender != 0) {
197  printf("RFEventProcessor : killing sender pid=%d\n", m_pid_sender);
198  kill(m_pid_sender, SIGINT);
199  waitpid(m_pid_sender, &status, 0);
200  }
201  if (m_pid_basf2 != 0) {
202  printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
203  // kill(m_pid_basf2, SIGINT);
204  kill(m_pid_basf2, SIGINT);
205  waitpid(m_pid_basf2, &status, 0);
206  m_pid_basf2 = 0;
207  }
208  if (m_pid_receiver != 0) {
209  printf("RFEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
210  kill(m_pid_receiver, SIGINT);
211  waitpid(m_pid_receiver, &status, 0);
212  }
213  if (m_pid_hrecv != 0) {
214  printf("RFEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
215  kill(m_pid_hrecv, SIGINT);
216  waitpid(m_pid_hrecv, &status, 0);
217  }
218  if (m_pid_hrelay != 0) {
219  printf("RFEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
220  kill(m_pid_hrelay, SIGINT);
221  waitpid(m_pid_hrelay, &status, 0);
222  }
223 
224  // Clear RingBuffers
225  m_rbufin->forceClear();
226  m_rbufout->forceClear();
227 
228  // Clear PID list
229  m_flow->fillProcessStatus(GetNodeInfo());
230 
231  printf("Unconfigure : done\n");
232  fflush(stdout);
233  return 0;
234 }
235 
236 int RFEventProcessor::Start(NSMmsg* nsmm, NSMcontext* nsmc)
237 {
238  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
239  string(m_conf->getconf("processor", "ringbufin"));
240  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
241  string(m_conf->getconf("processor", "ringbufout"));
242  char* hport = m_conf->getconf("processor", "historecv", "port");
243 
244  // 0. Set run numbers
245  m_expno = nsmm->pars[0];
246  m_runno = nsmm->pars[1];
247 
248  // 1. Clear RingBuffers
249  m_rbufout->forceClear();
250  m_rbufin->forceClear();
251 
252  // 2. Run basf2
253  char* basf2 = m_conf->getconf("processor", "basf2", "script");
254  if (nsmm->len > 0) {
255  basf2 = (char*) nsmm->datap;
256  printf("Configure: basf2 script overridden : %s\n", basf2);
257  }
258  m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
259 
260  return 0;
261 }
262 
263 int RFEventProcessor::Stop(NSMmsg* msgm, NSMcontext* msgc)
264 {
265  printf("RFEventProcessor : STOP processing started\n");
266  // fflush ( stdout );
267  /*
268  char* hcollect = m_conf->getconf("processor", "dqm", "hcollect");
269  char* filename = m_conf->getconf("processor", "dqm", "file");
270  char* nprocs = m_conf->getconf("processor", "basf2", "nprocs");
271  int pid_hcollect = m_proc->Execute(hcollect, filename, nprocs);
272  int status;
273  waitpid(pid_hcollect, &status, 0);
274  */
275 
276  /*
277  // Need to wait until all the events are processed and sent
278  RfNodeInfo* nodeinfo = GetNodeInfo();
279  int ncount = 0;
280  for (;;) {
281  RfShm_Cell& cellin = m_flow->getinfo(0);
282  RfShm_Cell& cellout = m_flow->getinfo(1);
283  if (cellin.nqueue == 0 && cellout.nqueue == 0) break;
284  printf("inqueue = %d : outqueue = %d\n", cellin.nqueue, cellout.nqueue);
285  if (ncount > 30) {
286  printf("RFEventProcessor : Timeout (30sec) in stopping\n");
287  break;
288  }
289  sleep(1);
290  ncount ++;
291  }
292  */
293  // Checking number of processed events using RfShm_Cell does not work as expected and is
294  // equivalent to wait for 30 sec. Just replaced with 5 sec waiting.
295  sleep(5);
296 
297  int status;
298  if (m_pid_basf2 != 0) {
299  printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
300  kill(m_pid_basf2, SIGINT);
301  waitpid(m_pid_basf2, &status, 0);
302  m_pid_basf2 = 0;
303  char outfile[1024];
304  sprintf(outfile, "dqm_e%4.4dr%6.6d.root", m_expno, m_runno);
305  std::rename("histofile.root", outfile);
306  printf("output file name = %s\n", outfile);
307  fflush(stdout);
308 
309  }
310  printf("RFEventProcessor : STOP processing done\n");
311  return 0;
312 }
313 
314 
315 int RFEventProcessor::Restart(NSMmsg*, NSMcontext*)
316 {
317  printf("RFEventProcessor : Restarting!!!!!\n");
318  /* Original impl.
319  if (m_pid_sender != 0) {
320  printf("RFEventProcessor : killing sender pid=%d\n", m_pid_sender);
321  kill(m_pid_sender, SIGINT);
322  }
323  if (m_pid_basf2 != 0) {
324  printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
325  kill(m_pid_basf2, SIGINT);
326  }
327  if (m_pid_receiver != 0) {
328  printf("RFEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
329  kill(m_pid_receiver, SIGINT);
330  }
331  if (m_pid_hrecv != 0) {
332  printf("RFEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
333  kill(m_pid_receiver, SIGINT);
334  }
335  if (m_pid_hrelay != 0) {
336  printf("RFEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
337  kill(m_pid_receiver, SIGINT);
338  }
339  // Simple implementation to stop all processes
340  system("killall basf2 sock2rbr rb2sockr hrelay hserver");
341  fflush(stdout);
342  */
343  NSMmsg* nsmmsg = NULL;
344  NSMcontext* nsmcontext = NULL;
345  RFEventProcessor::UnConfigure(nsmmsg, nsmcontext);
346  sleep(2);
347  RFEventProcessor::Configure(nsmmsg, nsmcontext);
348  return 0;
349 }
350 
351 // Server function
352 
353 void RFEventProcessor::server()
354 {
355  // Clear PID list
356  m_flow->fillProcessStatus(GetNodeInfo());
357  // Start Loop
358  while (true) {
359  pid_t pid = m_proc->CheckProcess();
360  if (pid > 0) {
361  printf("RFEventProcessor : process dead pid=%d\n", pid);
362  if (pid == m_pid_sender) {
363  m_log->Fatal("RFEventProcessor : sender dead. pid=%d\n", m_pid_sender);
364  m_pid_sender = 0;
365  } else if (pid == m_pid_basf2) {
366  m_log->Fatal("RFEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
367  m_pid_basf2 = 0;
368  } else if (pid == m_pid_receiver) {
369  m_log->Fatal("RFEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
370  m_pid_receiver = 0;
371  } else if (pid == m_pid_hrecv) {
372  m_log->Fatal("RFEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
373  m_pid_hrecv = 0;
374  } else if (pid == m_pid_hrelay) {
375  m_log->Fatal("RFEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
376  m_pid_hrelay = 0;
377  }
378  }
379  int st = m_proc->CheckOutput();
380  if (st < 0) {
381  perror("RFEventProcessor::server");
382  // exit ( -1 );
383  } else if (st > 0) {
384  m_log->ProcessLog(m_proc->GetFd());
385  }
386  m_flow->fillNodeInfo(0, GetNodeInfo(), false);
387  m_flow->fillNodeInfo(1, GetNodeInfo(), true);
388  m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
389  m_pid_hrecv, m_pid_hrelay);
390  }
391 }
392 
393 void RFEventProcessor::cleanup()
394 {
395  printf("RFEventProcessor : cleaning up\n");
396  UnConfigure(NULL, NULL);
397  printf("RFEventProcessor: Done. Exitting\n");
398  exit(-1);
399 }
400 
401 
402 
NSMmsg
Definition: nsm2.h:217
Belle2::RFFlowStat
Definition: RFFlowStat.h:28
Belle2::RFSharedMem
Definition: RFSharedMem.h:51
Belle2::RFLogManager
Definition: RFLogManager.h:18
Belle2::RingBuffer
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:36
Belle2::RFProcessManager
Definition: RFProcessManager.h:22
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::RFConf
Definition: RFConf.h:24
NSMcontext_struct
Definition: nsmlib2.h:66