Belle II Software  release-05-01-25
ERecoEventProcessor.cc
1 //+
2 // File : ERecoEventProcessor.cc
3 // Description : Collect outputs from worker node and send them to EVB2
4 // w/ branch to PXD
5 //
6 // Author : Ryosuke Itoh, IPNS, KEK
7 // Date : 24 - June - 2013
8 //-
9 
10 #include "daq/expreco/ERecoEventProcessor.h"
11 #include <iostream>
12 
13 #include <sys/stat.h>
14 #include <sys/types.h>
15 #include <sys/wait.h>
16 #include <unistd.h>
17 
18 #include <csignal>
19 #include <cstring>
20 
21 #define RFOTSOUT stdout
22 
23 using namespace std;
24 using namespace Belle2;
25 
26 ERecoEventProcessor::ERecoEventProcessor(string conffile)
27 {
28  // 0. Initialize configuration manager
29  m_conf = new RFConf(conffile.c_str());
30  // char* nodename = m_conf->getconf ( "processor", "nodename" );
31  // char nodename[256];
32  strcpy(m_nodename, "evp_");
33 
34  gethostname(&m_nodename[4], sizeof(m_nodename));
35  printf("nodename = %s\n", m_nodename);
36 
37  // 1. Set execution directory
38  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/" + string(m_nodename);
39  printf("execdir = %s\n", execdir.c_str());
40 
41  mkdir(execdir.c_str(), 0755);
42  chdir(execdir.c_str());
43 
44  // 2. Initialize local shared memory
45  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
46  string(m_nodename);
47  m_shm = new RFSharedMem((char*)shmname.c_str());
48 
49  // 3. Initialize process manager
50  m_proc = new RFProcessManager(m_nodename);
51 
52  // 4. Initialize RingBuffers
53  // char* rbufin = m_conf->getconf("processor", "ringbufin");
54  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
55  string(m_conf->getconf("processor", "ringbufin"));
56  int rbinsize = m_conf->getconfi("processor", "ringbufinsize");
57  // m_rbufin = new RingBuffer(rbufin, rbinsize);
58  m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
59  // char* rbufout = m_conf->getconf("processor", "ringbufout");
60  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
61  string(m_conf->getconf("processor", "ringbufout"));
62  int rboutsize = m_conf->getconfi("processor", "ringbufoutsize");
63  // m_rbufout = new RingBuffer(rbufout, rboutsize);
64  m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
65 
66  // 5. Initialize LogManager
67  m_log = new RFLogManager(m_nodename, m_conf->getconf("system", "lognode"));
68 
69  // 6. Initialize data flow monitor
70  m_flow = new RFFlowStat((char*)shmname.c_str());
71 
72 }
73 
74 ERecoEventProcessor::~ERecoEventProcessor()
75 {
76  delete m_log;
77  delete m_proc;
78  delete m_shm;
79  delete m_conf;
80  delete m_flow;
81  delete m_rbufin;
82  delete m_rbufout;
83 }
84 
85 
86 // Functions hooked up by NSM2
87 
88 int ERecoEventProcessor::Configure(NSMmsg* nsmm, NSMcontext* nsmc)
89 {
90  // Start processes from down stream
91 
92  // 0. Get common parameters
93  // char* rbufin = m_conf->getconf("processor", "ringbufin");
94  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
95  string(m_conf->getconf("processor", "ringbufin"));
96  // char* rbufout = m_conf->getconf("processor", "ringbufout");
97  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
98  string(m_conf->getconf("processor", "ringbufout"));
99 
100  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
101  string(m_nodename);
102 
103  // 1. Histogram Receiver
104  char* hrecv = m_conf->getconf("processor", "historecv", "script");
105  char* hport = m_conf->getconf("processor", "historecv", "port");
106  char* mapfile = m_conf->getconf("processor", "historecv", "mapfile");
107  m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
108 
109  sleep(5); // make sure that TMapFile is created.
110 
111  // 2. Histogram Relay
112  char* hrelay = m_conf->getconf("processor", "historelay", "script");
113  char* dqmdest = m_conf->getconf("dqmserver", "host");
114  char* dqmport = m_conf->getconf("dqmserver", "port");
115  char* interval = m_conf->getconf("processor", "historelay", "interval");
116  m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
117 
118  // 3. Run basf2
119  /*
120  char* basf2 = m_conf->getconf("processor", "basf2", "script");
121  if (nsmm->len > 0) {
122  basf2 = (char*) nsmm->datap;
123  printf("Configure: basf2 script overridden : %s\n", basf2);
124  }
125  m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
126  */
127 
128  // 4. Run receiver
129  char* receiver = m_conf->getconf("processor", "receiver", "script");
130  char* srchost = m_conf->getconf("distributor", "host");
131  // char* port = m_conf->getconf ( "distributor", "port" );
132  int portbase = m_conf->getconfi("distributor", "sender", "portbase");
133  char* hostbase = m_conf->getconf("processor", "nodebase");
134  int baselen = strlen(hostbase);
135  int rport;
136  sscanf(&m_nodename[strlen(m_nodename) - 2], "%d", &rport);
137  rport += portbase;
138  char portchar[256];
139  sprintf(portchar, "%d", rport);
140  m_pid_receiver = m_proc->Execute(receiver, (char*)rbufin.c_str(), srchost, portchar, (char*)shmname.c_str(), (char*)"0");
141 
142  // 5. Run EventServer
143  char* evs = m_conf->getconf("processor", "eventserver", "script");
144  char* evsport = m_conf->getconf("processor", "eventserver", "port");
145  m_pid_evs = m_proc->Execute(evs, (char*)rbufout.c_str(), evsport);
146 
147  printf("Configure : done\n");
148  fflush(stdout);
149  return 0;
150 
151 }
152 
153 int ERecoEventProcessor::UnConfigure(NSMmsg*, NSMcontext*)
154 {
155  // Simple implementation to stop all processes
156  // system("killall basf2 sock2rbr rb2sockr hrelay hserver");
157  int status;
158  if (m_pid_sender != 0) {
159  printf("ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
160  kill(m_pid_sender, SIGINT);
161  waitpid(m_pid_sender, &status, 0);
162  }
163  if (m_pid_basf2 != 0) {
164  printf("ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
165  kill(m_pid_basf2, SIGINT);
166  waitpid(m_pid_basf2, &status, 0);
167  }
168  if (m_pid_receiver != 0) {
169  printf("ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
170  kill(m_pid_receiver, SIGINT);
171  waitpid(m_pid_receiver, &status, 0);
172  }
173  if (m_pid_hrecv != 0) {
174  printf("ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
175  kill(m_pid_hrecv, SIGINT);
176  waitpid(m_pid_hrecv, &status, 0);
177  }
178  if (m_pid_hrelay != 0) {
179  printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
180  kill(m_pid_hrelay, SIGINT);
181  waitpid(m_pid_hrelay, &status, 0);
182  }
183  if (m_pid_evs != 0) {
184  printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
185  kill(m_pid_evs, SIGINT);
186  waitpid(m_pid_evs, &status, 0);
187  }
188 
189  // Clear RingBuffers
190  m_rbufin->forceClear();
191  m_rbufout->forceClear();
192 
193  // Clear PID list
194  m_flow->fillProcessStatus(GetNodeInfo());
195 
196  printf("Unconfigure : done\n");
197  fflush(stdout);
198  return 0;
199 }
200 
201 int ERecoEventProcessor::Start(NSMmsg* nsmm, NSMcontext* nsmc)
202 {
203  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
204  string(m_conf->getconf("processor", "ringbufin"));
205  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
206  string(m_conf->getconf("processor", "ringbufout"));
207  char* hport = m_conf->getconf("processor", "historecv", "port");
208 
209 
210  // 3. Run basf2
211  char* basf2 = m_conf->getconf("processor", "basf2", "script");
212  if (nsmm->len > 0) {
213  basf2 = (char*) nsmm->datap;
214  printf("Configure: basf2 script overridden : %s\n", basf2);
215  }
216  m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
217 
218  return 0;
219 }
220 
221 int ERecoEventProcessor::Stop(NSMmsg*, NSMcontext*)
222 {
223  /*
224  char* hcollect = m_conf->getconf("processor", "dqm", "hcollect");
225  char* filename = m_conf->getconf("processor", "dqm", "file");
226  char* nprocs = m_conf->getconf("processor", "basf2", "nprocs");
227  int pid_hcollect = m_proc->Execute(hcollect, filename, nprocs);
228  int status;
229  waitpid(pid_hcollect, &status, 0);
230  */
231  int status;
232  if (m_pid_basf2 != 0) {
233  printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
234  kill(m_pid_basf2, SIGINT);
235  waitpid(m_pid_basf2, &status, 0);
236  m_pid_basf2 = 0;
237  }
238  return 0;
239 }
240 
241 
242 int ERecoEventProcessor::Restart(NSMmsg*, NSMcontext*)
243 {
244  printf("ERecoEventProcessor : Restarting!!!!!\n");
245  /* Original impl.
246  if (m_pid_sender != 0) {
247  printf("ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
248  kill(m_pid_sender, SIGINT);
249  }
250  if (m_pid_basf2 != 0) {
251  printf("ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
252  kill(m_pid_basf2, SIGINT);
253  }
254  if (m_pid_receiver != 0) {
255  printf("ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
256  kill(m_pid_receiver, SIGINT);
257  }
258  if (m_pid_hrecv != 0) {
259  printf("ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
260  kill(m_pid_receiver, SIGINT);
261  }
262  if (m_pid_hrelay != 0) {
263  printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
264  kill(m_pid_receiver, SIGINT);
265  }
266  // Simple implementation to stop all processes
267  system("killall basf2 sock2rbr rb2sockr hrelay hserver");
268  fflush(stdout);
269  */
270  NSMmsg* nsmmsg = NULL;
271  NSMcontext* nsmcontext = NULL;
272  ERecoEventProcessor::UnConfigure(nsmmsg, nsmcontext);
273  sleep(2);
274  ERecoEventProcessor::Configure(nsmmsg, nsmcontext);
275  return 0;
276 }
277 
278 // Server function
279 
280 void ERecoEventProcessor::server()
281 {
282  // Clear PID list
283  m_flow->fillProcessStatus(GetNodeInfo());
284  // Start Loop
285  while (true) {
286  pid_t pid = m_proc->CheckProcess();
287  if (pid > 0) {
288  printf("ERecoEventProcessor : process dead pid=%d\n", pid);
289  if (pid == m_pid_sender) {
290  m_log->Fatal("ERecoEventProcessor : sender dead. pid=%d\n", m_pid_sender);
291  m_pid_sender = 0;
292  } else if (pid == m_pid_basf2) {
293  m_log->Fatal("ERecoEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
294  m_pid_basf2 = 0;
295  } else if (pid == m_pid_receiver) {
296  m_log->Fatal("ERecoEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
297  m_pid_receiver = 0;
298  } else if (pid == m_pid_hrecv) {
299  m_log->Fatal("ERecoEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
300  m_pid_hrecv = 0;
301  } else if (pid == m_pid_hrelay) {
302  m_log->Fatal("ERecoEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
303  m_pid_hrelay = 0;
304  }
305  }
306  int st = m_proc->CheckOutput();
307  if (st < 0) {
308  perror("ERecoEventProcessor::server");
309  // exit ( -1 );
310  } else if (st > 0) {
311  m_log->ProcessLog(m_proc->GetFd());
312  }
313  m_flow->fillNodeInfo(0, GetNodeInfo(), false);
314  m_flow->fillNodeInfo(1, GetNodeInfo(), true);
315  m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
316  m_pid_hrecv, m_pid_hrelay);
317  }
318 }
319 
320 
321 
NSMmsg
Definition: nsm2.h:217
Belle2::RFFlowStat
Definition: RFFlowStat.h:28
Belle2::RFSharedMem
Definition: RFSharedMem.h:51
Belle2::RFLogManager
Definition: RFLogManager.h:18
Belle2::RingBuffer
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:36
Belle2::RFProcessManager
Definition: RFProcessManager.h:22
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::RFConf
Definition: RFConf.h:24
NSMcontext_struct
Definition: nsmlib2.h:66