9#include "daq/rfarm/manager/RFEventProcessor.h"
19#define RFOTSOUT stdout
26RFEventProcessor::RFEventProcessor(
string conffile)
29 m_conf =
new RFConf(conffile.c_str());
32 strcpy(m_nodename,
"evp_");
34 gethostname(&m_nodename[4],
sizeof(m_nodename) - 4);
37 char hostnamebuf[256];
38 gethostname(hostnamebuf,
sizeof(hostnamebuf));
39 strcat(&m_nodename[4], &hostnamebuf[6]);
40 int lend = strlen(m_nodename);
41 m_nodename[lend + 1] = (char)0;
42 m_nodename[lend] = m_nodename[lend - 1];
43 strncpy(&m_nodename[lend - 1],
"0", 1);
46 printf(
"nodename = %s\n", m_nodename);
49 string execdir = string(m_conf->getconf(
"system",
"execdir_base")) +
"/" + string(m_nodename);
50 printf(
"execdir = %s\n", execdir.c_str());
52 mkdir(execdir.c_str(), 0755);
53 chdir(execdir.c_str());
56 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
65 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
66 string(m_conf->getconf(
"processor",
"ringbufin"));
67 int rbinsize = m_conf->getconfi(
"processor",
"ringbufinsize");
69 m_rbufin =
new RingBuffer(rbufin.c_str(), rbinsize);
71 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
72 string(m_conf->getconf(
"processor",
"ringbufout"));
73 int rboutsize = m_conf->getconfi(
"processor",
"ringbufoutsize");
75 m_rbufout =
new RingBuffer(rbufout.c_str(), rboutsize);
78 m_log =
new RFLogManager(m_nodename, m_conf->getconf(
"system",
"lognode"));
81 m_flow =
new RFFlowStat((
char*)shmname.c_str());
92RFEventProcessor::~RFEventProcessor()
112 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
113 string(m_conf->getconf(
"processor",
"ringbufin"));
115 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
116 string(m_conf->getconf(
"processor",
"ringbufout"));
118 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
122 char* hrecv = m_conf->getconf(
"processor",
"historecv",
"script");
123 char* hport = m_conf->getconf(
"processor",
"historecv",
"port");
124 char* mapfile = m_conf->getconf(
"processor",
"historecv",
"mapfile");
125 m_pid_hrecv = m_proc->
Execute(hrecv, hport, mapfile);
130 char* hrelay = m_conf->getconf(
"processor",
"historelay",
"script");
131 char* dqmdest = m_conf->getconf(
"dqmserver",
"host");
132 char* dqmport = m_conf->getconf(
"dqmserver",
"port");
133 char* interval = m_conf->getconf(
"processor",
"historelay",
"interval");
134 m_pid_hrelay = m_proc->
Execute(hrelay, mapfile, dqmdest, dqmport, interval);
137 char* sender = m_conf->getconf(
"processor",
"sender",
"script");
138 char* port = m_conf->getconf(
"processor",
"sender",
"port");
139 m_pid_sender = m_proc->
Execute(sender, (
char*)rbufout.c_str(), port, (
char*)shmname.c_str(), (
char*)
"1");
153 char* receiver = m_conf->getconf(
"processor",
"receiver",
"script");
154 char* srchost = m_conf->getconf(
"distributor",
"host");
156 int portbase = m_conf->getconfi(
"distributor",
"sender",
"portbase");
167 sscanf(&m_nodename[strlen(m_nodename) - 2],
"%d", &rport);
170 sprintf(portchar,
"%d", rport);
171 m_pid_receiver = m_proc->
Execute(receiver, (
char*)rbufin.c_str(), srchost, portchar, (
char*)shmname.c_str(), (
char*)
"0");
174 printf(
"Configure : done\n");
191 system(
"killall -9 python");
195 if (m_pid_sender != 0) {
196 printf(
"RFEventProcessor : killing sender pid=%d\n", m_pid_sender);
197 kill(m_pid_sender, SIGINT);
198 waitpid(m_pid_sender, &status, 0);
200 if (m_pid_basf2 != 0) {
201 printf(
"RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
203 kill(m_pid_basf2, SIGINT);
204 waitpid(m_pid_basf2, &status, 0);
207 if (m_pid_receiver != 0) {
208 printf(
"RFEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
209 kill(m_pid_receiver, SIGINT);
210 waitpid(m_pid_receiver, &status, 0);
212 if (m_pid_hrecv != 0) {
213 printf(
"RFEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
214 kill(m_pid_hrecv, SIGINT);
215 waitpid(m_pid_hrecv, &status, 0);
217 if (m_pid_hrelay != 0) {
218 printf(
"RFEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
219 kill(m_pid_hrelay, SIGINT);
220 waitpid(m_pid_hrelay, &status, 0);
228 m_flow->fillProcessStatus(GetNodeInfo());
230 printf(
"Unconfigure : done\n");
237 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
238 string(m_conf->getconf(
"processor",
"ringbufin"));
239 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
240 string(m_conf->getconf(
"processor",
"ringbufout"));
241 char* hport = m_conf->getconf(
"processor",
"historecv",
"port");
244 m_expno = nsmm->pars[0];
245 m_runno = nsmm->pars[1];
252 char* basf2 = m_conf->getconf(
"processor",
"basf2",
"script");
254 basf2 = (
char*) nsmm->datap;
255 printf(
"Configure: basf2 script overridden : %s\n", basf2);
257 m_pid_basf2 = m_proc->
Execute(basf2, (
char*)rbufin.c_str(), (
char*)rbufout.c_str(), hport);
264 printf(
"RFEventProcessor : STOP processing started\n");
297 if (m_pid_basf2 != 0) {
298 printf(
"RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
299 kill(m_pid_basf2, SIGINT);
300 waitpid(m_pid_basf2, &status, 0);
303 sprintf(outfile,
"dqm_e%4.4dr%6.6d.root", m_expno, m_runno);
304 std::rename(
"histofile.root", outfile);
305 printf(
"output file name = %s\n", outfile);
309 printf(
"RFEventProcessor : STOP processing done\n");
316 printf(
"RFEventProcessor : Restarting!!!!!\n");
344 RFEventProcessor::UnConfigure(nsmmsg, nsmcontext);
346 RFEventProcessor::Configure(nsmmsg, nsmcontext);
352void RFEventProcessor::server()
355 m_flow->fillProcessStatus(GetNodeInfo());
358 pid_t pid = m_proc->CheckProcess();
360 printf(
"RFEventProcessor : process dead pid=%d\n", pid);
361 if (pid == m_pid_sender) {
362 m_log->Fatal(
"RFEventProcessor : sender dead. pid=%d\n", m_pid_sender);
364 }
else if (pid == m_pid_basf2) {
365 m_log->Fatal(
"RFEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
367 }
else if (pid == m_pid_receiver) {
368 m_log->Fatal(
"RFEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
370 }
else if (pid == m_pid_hrecv) {
371 m_log->Fatal(
"RFEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
373 }
else if (pid == m_pid_hrelay) {
374 m_log->Fatal(
"RFEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
378 int st = m_proc->CheckOutput();
380 perror(
"RFEventProcessor::server");
383 m_log->ProcessLog(m_proc->GetFd());
385 m_flow->fillNodeInfo(0, GetNodeInfo(),
false);
386 m_flow->fillNodeInfo(1, GetNodeInfo(),
true);
387 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
388 m_pid_hrecv, m_pid_hrelay);
392void RFEventProcessor::cleanup()
394 printf(
"RFEventProcessor : cleaning up\n");
395 UnConfigure(NULL, NULL);
396 printf(
"RFEventProcessor: Done. Exitting\n");
int Execute(char *script, int nargs, char **args)
Class to manage a Ring Buffer placed in an IPC shared memory.
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Abstract base class for different kinds of events.