Belle II Software development
RFEventProcessor.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include "daq/rfarm/manager/RFEventProcessor.h"
10
11#include <sys/stat.h>
12#include <sys/types.h>
13#include <sys/wait.h>
14#include <unistd.h>
15
16#include <cstring>
17#include <iostream>
18
19#define RFOTSOUT stdout
20
21//#define DESY
22
23using namespace std;
24using namespace Belle2;
25
26RFEventProcessor::RFEventProcessor(string conffile)
27{
28 // 0. Initialize configuration manager
29 m_conf = new RFConf(conffile.c_str());
30 // char* nodename = m_conf->getconf ( "processor", "nodename" );
31 // char nodename[256];
32 strcpy(m_nodename, "evp_");
33#ifndef DESY
34 gethostname(&m_nodename[4], sizeof(m_nodename) - 4);
35#else
36 // Special treatment for DESY test nodes!!
37 char hostnamebuf[256];
38 gethostname(hostnamebuf, sizeof(hostnamebuf));
39 strcat(&m_nodename[4], &hostnamebuf[6]);
40 int lend = strlen(m_nodename);
41 m_nodename[lend + 1] = (char)0;
42 m_nodename[lend] = m_nodename[lend - 1];
43 strncpy(&m_nodename[lend - 1], "0", 1);
44 // End of DESY special treatment
45#endif
46 printf("nodename = %s\n", m_nodename);
47
48 // 1. Set execution directory
49 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/" + string(m_nodename);
50 printf("execdir = %s\n", execdir.c_str());
51
52 mkdir(execdir.c_str(), 0755);
53 chdir(execdir.c_str());
54
55 // 2. Initialize local shared memory
56 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
57 string(m_nodename);
58 m_shm = new RFSharedMem((char*)shmname.c_str());
59
60 // 3. Initialize process manager
61 m_proc = new RFProcessManager(m_nodename);
62
63 // 4. Initialize RingBuffers
64 // char* rbufin = m_conf->getconf("processor", "ringbufin");
65 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
66 string(m_conf->getconf("processor", "ringbufin"));
67 int rbinsize = m_conf->getconfi("processor", "ringbufinsize");
68 // m_rbufin = new RingBuffer(rbufin, rbinsize);
69 m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
70 // char* rbufout = m_conf->getconf("processor", "ringbufout");
71 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
72 string(m_conf->getconf("processor", "ringbufout"));
73 int rboutsize = m_conf->getconfi("processor", "ringbufoutsize");
74 // m_rbufout = new RingBuffer(rbufout, rboutsize);
75 m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
76
77 // 5. Initialize LogManager
78 m_log = new RFLogManager(m_nodename, m_conf->getconf("system", "lognode"));
79
80 // 6. Initialize data flow monitor
81 m_flow = new RFFlowStat((char*)shmname.c_str());
82
83 // 7. Clear PID list
84 m_pid_sender = 0;
85 m_pid_basf2 = 0;
86 m_pid_receiver = 0;
87 m_pid_hrecv = 0;
88 m_pid_hrelay = 0;
89
90}
91
92RFEventProcessor::~RFEventProcessor()
93{
94 delete m_log;
95 delete m_proc;
96 delete m_shm;
97 delete m_conf;
98 delete m_flow;
99 delete m_rbufin;
100 delete m_rbufout;
101}
102
103
104// Functions hooked up by NSM2
105
106int RFEventProcessor::Configure(NSMmsg* /*nsmm*/, NSMcontext* /*nsmc*/)
107{
108 // Start processes from down stream
109
110 // 0. Get common parameters
111 // char* rbufin = m_conf->getconf("processor", "ringbufin");
112 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
113 string(m_conf->getconf("processor", "ringbufin"));
114 // char* rbufout = m_conf->getconf("processor", "ringbufout");
115 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
116 string(m_conf->getconf("processor", "ringbufout"));
117
118 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
119 string(m_nodename);
120
121 // 1. Histogram Receiver
122 char* hrecv = m_conf->getconf("processor", "historecv", "script");
123 char* hport = m_conf->getconf("processor", "historecv", "port");
124 char* mapfile = m_conf->getconf("processor", "historecv", "mapfile");
125 m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
126
127 sleep(5); // make sure that TMapFile is created.
128
129 // 2. Histogram Relay
130 char* hrelay = m_conf->getconf("processor", "historelay", "script");
131 char* dqmdest = m_conf->getconf("dqmserver", "host");
132 char* dqmport = m_conf->getconf("dqmserver", "port");
133 char* interval = m_conf->getconf("processor", "historelay", "interval");
134 m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
135
136 // 3. Run sender / logger
137 char* sender = m_conf->getconf("processor", "sender", "script");
138 char* port = m_conf->getconf("processor", "sender", "port");
139 m_pid_sender = m_proc->Execute(sender, (char*)rbufout.c_str(), port, (char*)shmname.c_str(), (char*)"1");
140 m_flow->clear(1);
141
142 /*
143 // 4. Run basf2
144 char* basf2 = m_conf->getconf("processor", "basf2", "script");
145 if (nsmm->len > 0) {
146 basf2 = (char*) nsmm->datap;
147 printf("Configure: basf2 script overridden : %s\n", basf2);
148 }
149 m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
150 */
151
152 // 5. Run receiver
153 char* receiver = m_conf->getconf("processor", "receiver", "script");
154 char* srchost = m_conf->getconf("distributor", "host");
155 // char* port = m_conf->getconf ( "distributor", "port" );
156 int portbase = m_conf->getconfi("distributor", "sender", "portbase");
157 // char* hostbase = m_conf->getconf("processor", "nodebase");
158 /* OLD impl
159 int baselen = strlen(hostbase);
160 char hostname[256];
161 gethostname(hostname, sizeof(hostname));
162 char id[3];
163 strcpy(id, &hostname[baselen + 1]);
164 int rport = atoi(id) + portbase;
165 */
166 int rport;
167 sscanf(&m_nodename[strlen(m_nodename) - 2], "%d", &rport);
168 rport += portbase;
169 char portchar[256];
170 sprintf(portchar, "%d", rport);
171 m_pid_receiver = m_proc->Execute(receiver, (char*)rbufin.c_str(), srchost, portchar, (char*)shmname.c_str(), (char*)"0");
172 m_flow->clear(0);
173
174 printf("Configure : done\n");
175 fflush(stdout);
176
177 // 6 Clear RingBuffers
178 m_rbufin->forceClear();
179 m_rbufout->forceClear();
180
181 return 0;
182
183}
184
185int RFEventProcessor::UnConfigure(NSMmsg*, NSMcontext*)
186{
187 // Simple implementation to stop all processes
188 // system("killall basf2 sock2rbr rb2sockr hrelay hserver");
189
190 // Emergency stop
191 system("killall -9 python");
192
193 // Normal abort
194 int status;
195 if (m_pid_sender != 0) {
196 printf("RFEventProcessor : killing sender pid=%d\n", m_pid_sender);
197 kill(m_pid_sender, SIGINT);
198 waitpid(m_pid_sender, &status, 0);
199 }
200 if (m_pid_basf2 != 0) {
201 printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
202 // kill(m_pid_basf2, SIGINT);
203 kill(m_pid_basf2, SIGINT);
204 waitpid(m_pid_basf2, &status, 0);
205 m_pid_basf2 = 0;
206 }
207 if (m_pid_receiver != 0) {
208 printf("RFEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
209 kill(m_pid_receiver, SIGINT);
210 waitpid(m_pid_receiver, &status, 0);
211 }
212 if (m_pid_hrecv != 0) {
213 printf("RFEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
214 kill(m_pid_hrecv, SIGINT);
215 waitpid(m_pid_hrecv, &status, 0);
216 }
217 if (m_pid_hrelay != 0) {
218 printf("RFEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
219 kill(m_pid_hrelay, SIGINT);
220 waitpid(m_pid_hrelay, &status, 0);
221 }
222
223 // Clear RingBuffers
224 m_rbufin->forceClear();
225 m_rbufout->forceClear();
226
227 // Clear PID list
228 m_flow->fillProcessStatus(GetNodeInfo());
229
230 printf("Unconfigure : done\n");
231 fflush(stdout);
232 return 0;
233}
234
235int RFEventProcessor::Start(NSMmsg* nsmm, NSMcontext* /*nsmc*/)
236{
237 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
238 string(m_conf->getconf("processor", "ringbufin"));
239 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
240 string(m_conf->getconf("processor", "ringbufout"));
241 char* hport = m_conf->getconf("processor", "historecv", "port");
242
243 // 0. Set run numbers
244 m_expno = nsmm->pars[0];
245 m_runno = nsmm->pars[1];
246
247 // 1. Clear RingBuffers
248 m_rbufout->forceClear();
249 m_rbufin->forceClear();
250
251 // 2. Run basf2
252 char* basf2 = m_conf->getconf("processor", "basf2", "script");
253 if (nsmm->len > 0) {
254 basf2 = (char*) nsmm->datap;
255 printf("Configure: basf2 script overridden : %s\n", basf2);
256 }
257 m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
258
259 return 0;
260}
261
262int RFEventProcessor::Stop(NSMmsg* /*msgm*/, NSMcontext* /*msgc*/)
263{
264 printf("RFEventProcessor : STOP processing started\n");
265 // fflush ( stdout );
266 /*
267 char* hcollect = m_conf->getconf("processor", "dqm", "hcollect");
268 char* filename = m_conf->getconf("processor", "dqm", "file");
269 char* nprocs = m_conf->getconf("processor", "basf2", "nprocs");
270 int pid_hcollect = m_proc->Execute(hcollect, filename, nprocs);
271 int status;
272 waitpid(pid_hcollect, &status, 0);
273 */
274
275 /*
276 // Need to wait until all the events are processed and sent
277 RfNodeInfo* nodeinfo = GetNodeInfo();
278 int ncount = 0;
279 for (;;) {
280 RfShm_Cell& cellin = m_flow->getinfo(0);
281 RfShm_Cell& cellout = m_flow->getinfo(1);
282 if (cellin.nqueue == 0 && cellout.nqueue == 0) break;
283 printf("inqueue = %d : outqueue = %d\n", cellin.nqueue, cellout.nqueue);
284 if (ncount > 30) {
285 printf("RFEventProcessor : Timeout (30sec) in stopping\n");
286 break;
287 }
288 sleep(1);
289 ncount ++;
290 }
291 */
292 // Checking number of processed events using RfShm_Cell does not work as expected and is
293 // equivalent to wait for 30 sec. Just replaced with 5 sec waiting.
294 sleep(5);
295
296 int status;
297 if (m_pid_basf2 != 0) {
298 printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
299 kill(m_pid_basf2, SIGINT);
300 waitpid(m_pid_basf2, &status, 0);
301 m_pid_basf2 = 0;
302 char outfile[1024];
303 sprintf(outfile, "dqm_e%4.4dr%6.6d.root", m_expno, m_runno);
304 std::rename("histofile.root", outfile);
305 printf("output file name = %s\n", outfile);
306 fflush(stdout);
307
308 }
309 printf("RFEventProcessor : STOP processing done\n");
310 return 0;
311}
312
313
314int RFEventProcessor::Restart(NSMmsg*, NSMcontext*)
315{
316 printf("RFEventProcessor : Restarting!!!!!\n");
317 /* Original impl.
318 if (m_pid_sender != 0) {
319 printf("RFEventProcessor : killing sender pid=%d\n", m_pid_sender);
320 kill(m_pid_sender, SIGINT);
321 }
322 if (m_pid_basf2 != 0) {
323 printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
324 kill(m_pid_basf2, SIGINT);
325 }
326 if (m_pid_receiver != 0) {
327 printf("RFEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
328 kill(m_pid_receiver, SIGINT);
329 }
330 if (m_pid_hrecv != 0) {
331 printf("RFEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
332 kill(m_pid_receiver, SIGINT);
333 }
334 if (m_pid_hrelay != 0) {
335 printf("RFEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
336 kill(m_pid_receiver, SIGINT);
337 }
338 // Simple implementation to stop all processes
339 system("killall basf2 sock2rbr rb2sockr hrelay hserver");
340 fflush(stdout);
341 */
342 NSMmsg* nsmmsg = NULL;
343 NSMcontext* nsmcontext = NULL;
344 RFEventProcessor::UnConfigure(nsmmsg, nsmcontext);
345 sleep(2);
346 RFEventProcessor::Configure(nsmmsg, nsmcontext);
347 return 0;
348}
349
350// Server function
351
352void RFEventProcessor::server()
353{
354 // Clear PID list
355 m_flow->fillProcessStatus(GetNodeInfo());
356 // Start Loop
357 while (true) {
358 pid_t pid = m_proc->CheckProcess();
359 if (pid > 0) {
360 printf("RFEventProcessor : process dead pid=%d\n", pid);
361 if (pid == m_pid_sender) {
362 m_log->Fatal("RFEventProcessor : sender dead. pid=%d\n", m_pid_sender);
363 m_pid_sender = 0;
364 } else if (pid == m_pid_basf2) {
365 m_log->Fatal("RFEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
366 m_pid_basf2 = 0;
367 } else if (pid == m_pid_receiver) {
368 m_log->Fatal("RFEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
369 m_pid_receiver = 0;
370 } else if (pid == m_pid_hrecv) {
371 m_log->Fatal("RFEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
372 m_pid_hrecv = 0;
373 } else if (pid == m_pid_hrelay) {
374 m_log->Fatal("RFEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
375 m_pid_hrelay = 0;
376 }
377 }
378 int st = m_proc->CheckOutput();
379 if (st < 0) {
380 perror("RFEventProcessor::server");
381 // exit ( -1 );
382 } else if (st > 0) {
383 m_log->ProcessLog(m_proc->GetFd());
384 }
385 m_flow->fillNodeInfo(0, GetNodeInfo(), false);
386 m_flow->fillNodeInfo(1, GetNodeInfo(), true);
387 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
388 m_pid_hrecv, m_pid_hrelay);
389 }
390}
391
392void RFEventProcessor::cleanup()
393{
394 printf("RFEventProcessor : cleaning up\n");
395 UnConfigure(NULL, NULL);
396 printf("RFEventProcessor: Done. Exitting\n");
397 exit(-1);
398}
399
400
401
int Execute(char *script, int nargs, char **args)
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Definition: RingBuffer.cc:441
Abstract base class for different kinds of events.
STL namespace.
Definition: nsm2.h:224