Belle II Software  release-08-01-10
ERecoEventProcessor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include "daq/expreco/ERecoEventProcessor.h"
10 #include <iostream>
11 
12 #include <sys/stat.h>
13 #include <sys/types.h>
14 #include <sys/wait.h>
15 #include <unistd.h>
16 
17 #include <csignal>
18 #include <cstring>
19 
20 #define RFOTSOUT stdout
21 
22 using namespace std;
23 using namespace Belle2;
24 
25 ERecoEventProcessor::ERecoEventProcessor(string conffile)
26 {
27  // 0. Initialize configuration manager
28  m_conf = new RFConf(conffile.c_str());
29  // char* nodename = m_conf->getconf ( "processor", "nodename" );
30  // char nodename[256];
31  strcpy(m_nodename, "evp_");
32 
33  gethostname(&m_nodename[4], sizeof(m_nodename) - 4);
34  printf("nodename = %s\n", m_nodename);
35 
36  // 1. Set execution directory
37  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/" + string(m_nodename);
38  printf("execdir = %s\n", execdir.c_str());
39 
40  mkdir(execdir.c_str(), 0755);
41  chdir(execdir.c_str());
42 
43  // 2. Initialize local shared memory
44  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
45  string(m_nodename);
46  m_shm = new RFSharedMem((char*)shmname.c_str());
47 
48  // 3. Initialize process manager
49  m_proc = new RFProcessManager(m_nodename);
50 
51  // 4. Initialize RingBuffers
52  // char* rbufin = m_conf->getconf("processor", "ringbufin");
53  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
54  string(m_conf->getconf("processor", "ringbufin"));
55  int rbinsize = m_conf->getconfi("processor", "ringbufinsize");
56  // m_rbufin = new RingBuffer(rbufin, rbinsize);
57  m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
58  // char* rbufout = m_conf->getconf("processor", "ringbufout");
59  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
60  string(m_conf->getconf("processor", "ringbufout"));
61  int rboutsize = m_conf->getconfi("processor", "ringbufoutsize");
62  // m_rbufout = new RingBuffer(rbufout, rboutsize);
63  m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
64 
65  // 5. Initialize LogManager
66  m_log = new RFLogManager(m_nodename, m_conf->getconf("system", "lognode"));
67 
68  // 6. Initialize data flow monitor
69  m_flow = new RFFlowStat((char*)shmname.c_str());
70 
71 }
72 
73 ERecoEventProcessor::~ERecoEventProcessor()
74 {
75  delete m_log;
76  delete m_proc;
77  delete m_shm;
78  delete m_conf;
79  delete m_flow;
80  delete m_rbufin;
81  delete m_rbufout;
82 }
83 
84 
85 // Functions hooked up by NSM2
86 
87 int ERecoEventProcessor::Configure(NSMmsg* /*nsmm*/, NSMcontext* /*nsmc*/)
88 {
89  // Start processes from down stream
90 
91  // 0. Get common parameters
92  // char* rbufin = m_conf->getconf("processor", "ringbufin");
93  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
94  string(m_conf->getconf("processor", "ringbufin"));
95  // char* rbufout = m_conf->getconf("processor", "ringbufout");
96  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
97  string(m_conf->getconf("processor", "ringbufout"));
98 
99  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
100  string(m_nodename);
101 
102  // 1. Histogram Receiver
103  char* hrecv = m_conf->getconf("processor", "historecv", "script");
104  char* hport = m_conf->getconf("processor", "historecv", "port");
105  char* mapfile = m_conf->getconf("processor", "historecv", "mapfile");
106  m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
107 
108  sleep(5); // make sure that TMapFile is created.
109 
110  // 2. Histogram Relay
111  char* hrelay = m_conf->getconf("processor", "historelay", "script");
112  char* dqmdest = m_conf->getconf("dqmserver", "host");
113  char* dqmport = m_conf->getconf("dqmserver", "port");
114  char* interval = m_conf->getconf("processor", "historelay", "interval");
115  m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
116 
117  // 3. Run basf2
118  /*
119  char* basf2 = m_conf->getconf("processor", "basf2", "script");
120  if (nsmm->len > 0) {
121  basf2 = (char*) nsmm->datap;
122  printf("Configure: basf2 script overridden : %s\n", basf2);
123  }
124  m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
125  */
126 
127  // 4. Run receiver
128  char* receiver = m_conf->getconf("processor", "receiver", "script");
129  char* srchost = m_conf->getconf("distributor", "host");
130  // char* port = m_conf->getconf ( "distributor", "port" );
131  int portbase = m_conf->getconfi("distributor", "sender", "portbase");
132  // char* hostbase = m_conf->getconf("processor", "nodebase");
133  int rport;
134  sscanf(&m_nodename[strlen(m_nodename) - 2], "%d", &rport);
135  rport += portbase;
136  char portchar[256];
137  sprintf(portchar, "%d", rport);
138  m_pid_receiver = m_proc->Execute(receiver, (char*)rbufin.c_str(), srchost, portchar, (char*)shmname.c_str(), (char*)"0");
139 
140  // 5. Run EventServer
141  char* evs = m_conf->getconf("processor", "eventserver", "script");
142  char* evsport = m_conf->getconf("processor", "eventserver", "port");
143  m_pid_evs = m_proc->Execute(evs, (char*)rbufout.c_str(), evsport);
144 
145  printf("Configure : done\n");
146  fflush(stdout);
147  return 0;
148 
149 }
150 
151 int ERecoEventProcessor::UnConfigure(NSMmsg*, NSMcontext*)
152 {
153  // Simple implementation to stop all processes
154  // system("killall basf2 sock2rbr rb2sockr hrelay hserver");
155  int status;
156  if (m_pid_sender != 0) {
157  printf("ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
158  kill(m_pid_sender, SIGINT);
159  waitpid(m_pid_sender, &status, 0);
160  }
161  if (m_pid_basf2 != 0) {
162  printf("ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
163  kill(m_pid_basf2, SIGINT);
164  waitpid(m_pid_basf2, &status, 0);
165  }
166  if (m_pid_receiver != 0) {
167  printf("ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
168  kill(m_pid_receiver, SIGINT);
169  waitpid(m_pid_receiver, &status, 0);
170  }
171  if (m_pid_hrecv != 0) {
172  printf("ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
173  kill(m_pid_hrecv, SIGINT);
174  waitpid(m_pid_hrecv, &status, 0);
175  }
176  if (m_pid_hrelay != 0) {
177  printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
178  kill(m_pid_hrelay, SIGINT);
179  waitpid(m_pid_hrelay, &status, 0);
180  }
181  if (m_pid_evs != 0) {
182  printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
183  kill(m_pid_evs, SIGINT);
184  waitpid(m_pid_evs, &status, 0);
185  }
186 
187  // Clear RingBuffers
188  m_rbufin->forceClear();
189  m_rbufout->forceClear();
190 
191  // Clear PID list
192  m_flow->fillProcessStatus(GetNodeInfo());
193 
194  printf("Unconfigure : done\n");
195  fflush(stdout);
196  return 0;
197 }
198 
199 int ERecoEventProcessor::Start(NSMmsg* nsmm, NSMcontext* /*nsmc*/)
200 {
201  // Clear DQM histogram memory
202  char cmdline[] = "hsendcommand DQMRC:CLEAR localhost 10391";
203  system(cmdline);
204  printf("ERecoEventProcessor : DQM TMemFile cleared\n");
205 
206  m_rbufin->clear();
207 
208  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
209  string(m_conf->getconf("processor", "ringbufin"));
210  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
211  string(m_conf->getconf("processor", "ringbufout"));
212  char* hport = m_conf->getconf("processor", "historecv", "port");
213 
214 
215  // 3. Run basf2
216  char* basf2 = m_conf->getconf("processor", "basf2", "script");
217  if (nsmm->len > 0) {
218  basf2 = (char*) nsmm->datap;
219  printf("Configure: basf2 script overridden : %s\n", basf2);
220  }
221  m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
222 
223  return 0;
224 }
225 
226 int ERecoEventProcessor::Stop(NSMmsg*, NSMcontext*)
227 {
228  /*
229  char* hcollect = m_conf->getconf("processor", "dqm", "hcollect");
230  char* filename = m_conf->getconf("processor", "dqm", "file");
231  char* nprocs = m_conf->getconf("processor", "basf2", "nprocs");
232  int pid_hcollect = m_proc->Execute(hcollect, filename, nprocs);
233  int status;
234  waitpid(pid_hcollect, &status, 0);
235  */
236  int status;
237  if (m_pid_basf2 != 0) {
238  printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
239  kill(m_pid_basf2, SIGINT);
240  waitpid(m_pid_basf2, &status, 0);
241  m_pid_basf2 = 0;
242  }
243  return 0;
244 }
245 
246 
247 int ERecoEventProcessor::Restart(NSMmsg*, NSMcontext*)
248 {
249  printf("ERecoEventProcessor : Restarting!!!!!\n");
250  /* Original impl.
251  if (m_pid_sender != 0) {
252  printf("ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
253  kill(m_pid_sender, SIGINT);
254  }
255  if (m_pid_basf2 != 0) {
256  printf("ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
257  kill(m_pid_basf2, SIGINT);
258  }
259  if (m_pid_receiver != 0) {
260  printf("ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
261  kill(m_pid_receiver, SIGINT);
262  }
263  if (m_pid_hrecv != 0) {
264  printf("ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
265  kill(m_pid_receiver, SIGINT);
266  }
267  if (m_pid_hrelay != 0) {
268  printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
269  kill(m_pid_receiver, SIGINT);
270  }
271  // Simple implementation to stop all processes
272  system("killall basf2 sock2rbr rb2sockr hrelay hserver");
273  fflush(stdout);
274  */
275  NSMmsg* nsmmsg = NULL;
276  NSMcontext* nsmcontext = NULL;
277  ERecoEventProcessor::UnConfigure(nsmmsg, nsmcontext);
278  sleep(2);
279  ERecoEventProcessor::Configure(nsmmsg, nsmcontext);
280  return 0;
281 }
282 
283 // Server function
284 
285 void ERecoEventProcessor::server()
286 {
287  // Clear PID list
288  m_flow->fillProcessStatus(GetNodeInfo());
289  // Start Loop
290  while (true) {
291  pid_t pid = m_proc->CheckProcess();
292  if (pid > 0) {
293  printf("ERecoEventProcessor : process dead pid=%d\n", pid);
294  if (pid == m_pid_sender) {
295  m_log->Fatal("ERecoEventProcessor : sender dead. pid=%d\n", m_pid_sender);
296  m_pid_sender = 0;
297  } else if (pid == m_pid_basf2) {
298  m_log->Fatal("ERecoEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
299  m_pid_basf2 = 0;
300  } else if (pid == m_pid_receiver) {
301  m_log->Fatal("ERecoEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
302  m_pid_receiver = 0;
303  } else if (pid == m_pid_hrecv) {
304  m_log->Fatal("ERecoEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
305  m_pid_hrecv = 0;
306  } else if (pid == m_pid_hrelay) {
307  m_log->Fatal("ERecoEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
308  m_pid_hrelay = 0;
309  }
310  }
311  int st = m_proc->CheckOutput();
312  if (st < 0) {
313  perror("ERecoEventProcessor::server");
314  // exit ( -1 );
315  } else if (st > 0) {
316  m_log->ProcessLog(m_proc->GetFd());
317  }
318  m_flow->fillNodeInfo(0, GetNodeInfo(), false);
319  m_flow->fillNodeInfo(1, GetNodeInfo(), true);
320  m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
321  m_pid_hrecv, m_pid_hrelay);
322  }
323 }
324 
325 
326 
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
Abstract base class for different kinds of events.
Definition: nsm2.h:224