Belle II Software  release-06-00-14
ERecoEventProcessor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include "daq/expreco/ERecoEventProcessor.h"
10 #include <iostream>
11 
12 #include <sys/stat.h>
13 #include <sys/types.h>
14 #include <sys/wait.h>
15 #include <unistd.h>
16 
17 #include <csignal>
18 #include <cstring>
19 
20 #define RFOTSOUT stdout
21 
22 using namespace std;
23 using namespace Belle2;
24 
25 ERecoEventProcessor::ERecoEventProcessor(string conffile)
26 {
27  // 0. Initialize configuration manager
28  m_conf = new RFConf(conffile.c_str());
29  // char* nodename = m_conf->getconf ( "processor", "nodename" );
30  // char nodename[256];
31  strcpy(m_nodename, "evp_");
32 
33  gethostname(&m_nodename[4], sizeof(m_nodename));
34  printf("nodename = %s\n", m_nodename);
35 
36  // 1. Set execution directory
37  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/" + string(m_nodename);
38  printf("execdir = %s\n", execdir.c_str());
39 
40  mkdir(execdir.c_str(), 0755);
41  chdir(execdir.c_str());
42 
43  // 2. Initialize local shared memory
44  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
45  string(m_nodename);
46  m_shm = new RFSharedMem((char*)shmname.c_str());
47 
48  // 3. Initialize process manager
49  m_proc = new RFProcessManager(m_nodename);
50 
51  // 4. Initialize RingBuffers
52  // char* rbufin = m_conf->getconf("processor", "ringbufin");
53  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
54  string(m_conf->getconf("processor", "ringbufin"));
55  int rbinsize = m_conf->getconfi("processor", "ringbufinsize");
56  // m_rbufin = new RingBuffer(rbufin, rbinsize);
57  m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
58  // char* rbufout = m_conf->getconf("processor", "ringbufout");
59  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
60  string(m_conf->getconf("processor", "ringbufout"));
61  int rboutsize = m_conf->getconfi("processor", "ringbufoutsize");
62  // m_rbufout = new RingBuffer(rbufout, rboutsize);
63  m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
64 
65  // 5. Initialize LogManager
66  m_log = new RFLogManager(m_nodename, m_conf->getconf("system", "lognode"));
67 
68  // 6. Initialize data flow monitor
69  m_flow = new RFFlowStat((char*)shmname.c_str());
70 
71 }
72 
73 ERecoEventProcessor::~ERecoEventProcessor()
74 {
75  delete m_log;
76  delete m_proc;
77  delete m_shm;
78  delete m_conf;
79  delete m_flow;
80  delete m_rbufin;
81  delete m_rbufout;
82 }
83 
84 
85 // Functions hooked up by NSM2
86 
87 int ERecoEventProcessor::Configure(NSMmsg* nsmm, NSMcontext* nsmc)
88 {
89  // Start processes from down stream
90 
91  // 0. Get common parameters
92  // char* rbufin = m_conf->getconf("processor", "ringbufin");
93  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
94  string(m_conf->getconf("processor", "ringbufin"));
95  // char* rbufout = m_conf->getconf("processor", "ringbufout");
96  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
97  string(m_conf->getconf("processor", "ringbufout"));
98 
99  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
100  string(m_nodename);
101 
102  // 1. Histogram Receiver
103  char* hrecv = m_conf->getconf("processor", "historecv", "script");
104  char* hport = m_conf->getconf("processor", "historecv", "port");
105  char* mapfile = m_conf->getconf("processor", "historecv", "mapfile");
106  m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
107 
108  sleep(5); // make sure that TMapFile is created.
109 
110  // 2. Histogram Relay
111  char* hrelay = m_conf->getconf("processor", "historelay", "script");
112  char* dqmdest = m_conf->getconf("dqmserver", "host");
113  char* dqmport = m_conf->getconf("dqmserver", "port");
114  char* interval = m_conf->getconf("processor", "historelay", "interval");
115  m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
116 
117  // 3. Run basf2
118  /*
119  char* basf2 = m_conf->getconf("processor", "basf2", "script");
120  if (nsmm->len > 0) {
121  basf2 = (char*) nsmm->datap;
122  printf("Configure: basf2 script overridden : %s\n", basf2);
123  }
124  m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
125  */
126 
127  // 4. Run receiver
128  char* receiver = m_conf->getconf("processor", "receiver", "script");
129  char* srchost = m_conf->getconf("distributor", "host");
130  // char* port = m_conf->getconf ( "distributor", "port" );
131  int portbase = m_conf->getconfi("distributor", "sender", "portbase");
132  char* hostbase = m_conf->getconf("processor", "nodebase");
133  int baselen = strlen(hostbase);
134  int rport;
135  sscanf(&m_nodename[strlen(m_nodename) - 2], "%d", &rport);
136  rport += portbase;
137  char portchar[256];
138  sprintf(portchar, "%d", rport);
139  m_pid_receiver = m_proc->Execute(receiver, (char*)rbufin.c_str(), srchost, portchar, (char*)shmname.c_str(), (char*)"0");
140 
141  // 5. Run EventServer
142  char* evs = m_conf->getconf("processor", "eventserver", "script");
143  char* evsport = m_conf->getconf("processor", "eventserver", "port");
144  m_pid_evs = m_proc->Execute(evs, (char*)rbufout.c_str(), evsport);
145 
146  printf("Configure : done\n");
147  fflush(stdout);
148  return 0;
149 
150 }
151 
152 int ERecoEventProcessor::UnConfigure(NSMmsg*, NSMcontext*)
153 {
154  // Simple implementation to stop all processes
155  // system("killall basf2 sock2rbr rb2sockr hrelay hserver");
156  int status;
157  if (m_pid_sender != 0) {
158  printf("ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
159  kill(m_pid_sender, SIGINT);
160  waitpid(m_pid_sender, &status, 0);
161  }
162  if (m_pid_basf2 != 0) {
163  printf("ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
164  kill(m_pid_basf2, SIGINT);
165  waitpid(m_pid_basf2, &status, 0);
166  }
167  if (m_pid_receiver != 0) {
168  printf("ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
169  kill(m_pid_receiver, SIGINT);
170  waitpid(m_pid_receiver, &status, 0);
171  }
172  if (m_pid_hrecv != 0) {
173  printf("ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
174  kill(m_pid_hrecv, SIGINT);
175  waitpid(m_pid_hrecv, &status, 0);
176  }
177  if (m_pid_hrelay != 0) {
178  printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
179  kill(m_pid_hrelay, SIGINT);
180  waitpid(m_pid_hrelay, &status, 0);
181  }
182  if (m_pid_evs != 0) {
183  printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
184  kill(m_pid_evs, SIGINT);
185  waitpid(m_pid_evs, &status, 0);
186  }
187 
188  // Clear RingBuffers
189  m_rbufin->forceClear();
190  m_rbufout->forceClear();
191 
192  // Clear PID list
193  m_flow->fillProcessStatus(GetNodeInfo());
194 
195  printf("Unconfigure : done\n");
196  fflush(stdout);
197  return 0;
198 }
199 
200 int ERecoEventProcessor::Start(NSMmsg* nsmm, NSMcontext* nsmc)
201 {
202  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
203  string(m_conf->getconf("processor", "ringbufin"));
204  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
205  string(m_conf->getconf("processor", "ringbufout"));
206  char* hport = m_conf->getconf("processor", "historecv", "port");
207 
208 
209  // 3. Run basf2
210  char* basf2 = m_conf->getconf("processor", "basf2", "script");
211  if (nsmm->len > 0) {
212  basf2 = (char*) nsmm->datap;
213  printf("Configure: basf2 script overridden : %s\n", basf2);
214  }
215  m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
216 
217  return 0;
218 }
219 
220 int ERecoEventProcessor::Stop(NSMmsg*, NSMcontext*)
221 {
222  /*
223  char* hcollect = m_conf->getconf("processor", "dqm", "hcollect");
224  char* filename = m_conf->getconf("processor", "dqm", "file");
225  char* nprocs = m_conf->getconf("processor", "basf2", "nprocs");
226  int pid_hcollect = m_proc->Execute(hcollect, filename, nprocs);
227  int status;
228  waitpid(pid_hcollect, &status, 0);
229  */
230  int status;
231  if (m_pid_basf2 != 0) {
232  printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
233  kill(m_pid_basf2, SIGINT);
234  waitpid(m_pid_basf2, &status, 0);
235  m_pid_basf2 = 0;
236  }
237  return 0;
238 }
239 
240 
241 int ERecoEventProcessor::Restart(NSMmsg*, NSMcontext*)
242 {
243  printf("ERecoEventProcessor : Restarting!!!!!\n");
244  /* Original impl.
245  if (m_pid_sender != 0) {
246  printf("ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
247  kill(m_pid_sender, SIGINT);
248  }
249  if (m_pid_basf2 != 0) {
250  printf("ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
251  kill(m_pid_basf2, SIGINT);
252  }
253  if (m_pid_receiver != 0) {
254  printf("ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
255  kill(m_pid_receiver, SIGINT);
256  }
257  if (m_pid_hrecv != 0) {
258  printf("ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
259  kill(m_pid_receiver, SIGINT);
260  }
261  if (m_pid_hrelay != 0) {
262  printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
263  kill(m_pid_receiver, SIGINT);
264  }
265  // Simple implementation to stop all processes
266  system("killall basf2 sock2rbr rb2sockr hrelay hserver");
267  fflush(stdout);
268  */
269  NSMmsg* nsmmsg = NULL;
270  NSMcontext* nsmcontext = NULL;
271  ERecoEventProcessor::UnConfigure(nsmmsg, nsmcontext);
272  sleep(2);
273  ERecoEventProcessor::Configure(nsmmsg, nsmcontext);
274  return 0;
275 }
276 
277 // Server function
278 
279 void ERecoEventProcessor::server()
280 {
281  // Clear PID list
282  m_flow->fillProcessStatus(GetNodeInfo());
283  // Start Loop
284  while (true) {
285  pid_t pid = m_proc->CheckProcess();
286  if (pid > 0) {
287  printf("ERecoEventProcessor : process dead pid=%d\n", pid);
288  if (pid == m_pid_sender) {
289  m_log->Fatal("ERecoEventProcessor : sender dead. pid=%d\n", m_pid_sender);
290  m_pid_sender = 0;
291  } else if (pid == m_pid_basf2) {
292  m_log->Fatal("ERecoEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
293  m_pid_basf2 = 0;
294  } else if (pid == m_pid_receiver) {
295  m_log->Fatal("ERecoEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
296  m_pid_receiver = 0;
297  } else if (pid == m_pid_hrecv) {
298  m_log->Fatal("ERecoEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
299  m_pid_hrecv = 0;
300  } else if (pid == m_pid_hrelay) {
301  m_log->Fatal("ERecoEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
302  m_pid_hrelay = 0;
303  }
304  }
305  int st = m_proc->CheckOutput();
306  if (st < 0) {
307  perror("ERecoEventProcessor::server");
308  // exit ( -1 );
309  } else if (st > 0) {
310  m_log->ProcessLog(m_proc->GetFd());
311  }
312  m_flow->fillNodeInfo(0, GetNodeInfo(), false);
313  m_flow->fillNodeInfo(1, GetNodeInfo(), true);
314  m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
315  m_pid_hrecv, m_pid_hrelay);
316  }
317 }
318 
319 
320 
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
Abstract base class for different kinds of events.
Definition: nsm2.h:224