10 #include "daq/rfarm/manager/RFEventProcessor.h"
13 #include <sys/types.h>
20 #define RFOTSOUT stdout
27 RFEventProcessor::RFEventProcessor(
string conffile)
30 m_conf =
new RFConf(conffile.c_str());
33 strcpy(m_nodename,
"evp_");
35 gethostname(&m_nodename[4],
sizeof(m_nodename));
38 char hostnamebuf[256];
39 gethostname(hostnamebuf,
sizeof(hostnamebuf));
40 strcat(&m_nodename[4], &hostnamebuf[6]);
41 int lend = strlen(m_nodename);
42 m_nodename[lend + 1] = (char)0;
43 m_nodename[lend] = m_nodename[lend - 1];
44 strncpy(&m_nodename[lend - 1],
"0", 1);
47 printf(
"nodename = %s\n", m_nodename);
50 string execdir = string(m_conf->getconf(
"system",
"execdir_base")) +
"/" + string(m_nodename);
51 printf(
"execdir = %s\n", execdir.c_str());
53 mkdir(execdir.c_str(), 0755);
54 chdir(execdir.c_str());
57 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
66 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
67 string(m_conf->getconf(
"processor",
"ringbufin"));
68 int rbinsize = m_conf->getconfi(
"processor",
"ringbufinsize");
70 m_rbufin =
new RingBuffer(rbufin.c_str(), rbinsize);
72 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
73 string(m_conf->getconf(
"processor",
"ringbufout"));
74 int rboutsize = m_conf->getconfi(
"processor",
"ringbufoutsize");
76 m_rbufout =
new RingBuffer(rbufout.c_str(), rboutsize);
79 m_log =
new RFLogManager(m_nodename, m_conf->getconf(
"system",
"lognode"));
82 m_flow =
new RFFlowStat((
char*)shmname.c_str());
93 RFEventProcessor::~RFEventProcessor()
113 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
114 string(m_conf->getconf(
"processor",
"ringbufin"));
116 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
117 string(m_conf->getconf(
"processor",
"ringbufout"));
119 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
123 char* hrecv = m_conf->getconf(
"processor",
"historecv",
"script");
124 char* hport = m_conf->getconf(
"processor",
"historecv",
"port");
125 char* mapfile = m_conf->getconf(
"processor",
"historecv",
"mapfile");
126 m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
131 char* hrelay = m_conf->getconf(
"processor",
"historelay",
"script");
132 char* dqmdest = m_conf->getconf(
"dqmserver",
"host");
133 char* dqmport = m_conf->getconf(
"dqmserver",
"port");
134 char* interval = m_conf->getconf(
"processor",
"historelay",
"interval");
135 m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
138 char* sender = m_conf->getconf(
"processor",
"sender",
"script");
139 char* port = m_conf->getconf(
"processor",
"sender",
"port");
140 m_pid_sender = m_proc->Execute(sender, (
char*)rbufout.c_str(), port, (
char*)shmname.c_str(), (
char*)
"1");
154 char* receiver = m_conf->getconf(
"processor",
"receiver",
"script");
155 char* srchost = m_conf->getconf(
"distributor",
"host");
157 int portbase = m_conf->getconfi(
"distributor",
"sender",
"portbase");
158 char* hostbase = m_conf->getconf(
"processor",
"nodebase");
159 int baselen = strlen(hostbase);
168 sscanf(&m_nodename[strlen(m_nodename) - 2],
"%d", &rport);
171 sprintf(portchar,
"%d", rport);
172 m_pid_receiver = m_proc->Execute(receiver, (
char*)rbufin.c_str(), srchost, portchar, (
char*)shmname.c_str(), (
char*)
"0");
175 printf(
"Configure : done\n");
179 m_rbufin->forceClear();
180 m_rbufout->forceClear();
192 system(
"killall -9 python");
196 if (m_pid_sender != 0) {
197 printf(
"RFEventProcessor : killing sender pid=%d\n", m_pid_sender);
198 kill(m_pid_sender, SIGINT);
199 waitpid(m_pid_sender, &status, 0);
201 if (m_pid_basf2 != 0) {
202 printf(
"RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
204 kill(m_pid_basf2, SIGINT);
205 waitpid(m_pid_basf2, &status, 0);
208 if (m_pid_receiver != 0) {
209 printf(
"RFEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
210 kill(m_pid_receiver, SIGINT);
211 waitpid(m_pid_receiver, &status, 0);
213 if (m_pid_hrecv != 0) {
214 printf(
"RFEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
215 kill(m_pid_hrecv, SIGINT);
216 waitpid(m_pid_hrecv, &status, 0);
218 if (m_pid_hrelay != 0) {
219 printf(
"RFEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
220 kill(m_pid_hrelay, SIGINT);
221 waitpid(m_pid_hrelay, &status, 0);
225 m_rbufin->forceClear();
226 m_rbufout->forceClear();
229 m_flow->fillProcessStatus(GetNodeInfo());
231 printf(
"Unconfigure : done\n");
238 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
239 string(m_conf->getconf(
"processor",
"ringbufin"));
240 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
241 string(m_conf->getconf(
"processor",
"ringbufout"));
242 char* hport = m_conf->getconf(
"processor",
"historecv",
"port");
245 m_expno = nsmm->pars[0];
246 m_runno = nsmm->pars[1];
249 m_rbufout->forceClear();
250 m_rbufin->forceClear();
253 char* basf2 = m_conf->getconf(
"processor",
"basf2",
"script");
255 basf2 = (
char*) nsmm->datap;
256 printf(
"Configure: basf2 script overridden : %s\n", basf2);
258 m_pid_basf2 = m_proc->Execute(basf2, (
char*)rbufin.c_str(), (
char*)rbufout.c_str(), hport);
265 printf(
"RFEventProcessor : STOP processing started\n");
298 if (m_pid_basf2 != 0) {
299 printf(
"RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
300 kill(m_pid_basf2, SIGINT);
301 waitpid(m_pid_basf2, &status, 0);
304 sprintf(outfile,
"dqm_e%4.4dr%6.6d.root", m_expno, m_runno);
305 std::rename(
"histofile.root", outfile);
306 printf(
"output file name = %s\n", outfile);
310 printf(
"RFEventProcessor : STOP processing done\n");
317 printf(
"RFEventProcessor : Restarting!!!!!\n");
345 RFEventProcessor::UnConfigure(nsmmsg, nsmcontext);
347 RFEventProcessor::Configure(nsmmsg, nsmcontext);
353 void RFEventProcessor::server()
356 m_flow->fillProcessStatus(GetNodeInfo());
359 pid_t pid = m_proc->CheckProcess();
361 printf(
"RFEventProcessor : process dead pid=%d\n", pid);
362 if (pid == m_pid_sender) {
363 m_log->Fatal(
"RFEventProcessor : sender dead. pid=%d\n", m_pid_sender);
365 }
else if (pid == m_pid_basf2) {
366 m_log->Fatal(
"RFEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
368 }
else if (pid == m_pid_receiver) {
369 m_log->Fatal(
"RFEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
371 }
else if (pid == m_pid_hrecv) {
372 m_log->Fatal(
"RFEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
374 }
else if (pid == m_pid_hrelay) {
375 m_log->Fatal(
"RFEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
379 int st = m_proc->CheckOutput();
381 perror(
"RFEventProcessor::server");
384 m_log->ProcessLog(m_proc->GetFd());
386 m_flow->fillNodeInfo(0, GetNodeInfo(),
false);
387 m_flow->fillNodeInfo(1, GetNodeInfo(),
true);
388 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
389 m_pid_hrecv, m_pid_hrelay);
393 void RFEventProcessor::cleanup()
395 printf(
"RFEventProcessor : cleaning up\n");
396 UnConfigure(NULL, NULL);
397 printf(
"RFEventProcessor: Done. Exitting\n");