9 #include "daq/expreco/ERecoEventProcessor.h"
13 #include <sys/types.h>
20 #define RFOTSOUT stdout
25 ERecoEventProcessor::ERecoEventProcessor(
string conffile)
28 m_conf =
new RFConf(conffile.c_str());
31 strcpy(m_nodename,
"evp_");
33 gethostname(&m_nodename[4],
sizeof(m_nodename) - 4);
34 printf(
"nodename = %s\n", m_nodename);
37 string execdir = string(m_conf->getconf(
"system",
"execdir_base")) +
"/" + string(m_nodename);
38 printf(
"execdir = %s\n", execdir.c_str());
40 mkdir(execdir.c_str(), 0755);
41 chdir(execdir.c_str());
44 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
53 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
54 string(m_conf->getconf(
"processor",
"ringbufin"));
55 int rbinsize = m_conf->getconfi(
"processor",
"ringbufinsize");
57 m_rbufin =
new RingBuffer(rbufin.c_str(), rbinsize);
59 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
60 string(m_conf->getconf(
"processor",
"ringbufout"));
61 int rboutsize = m_conf->getconfi(
"processor",
"ringbufoutsize");
63 m_rbufout =
new RingBuffer(rbufout.c_str(), rboutsize);
66 m_log =
new RFLogManager(m_nodename, m_conf->getconf(
"system",
"lognode"));
69 m_flow =
new RFFlowStat((
char*)shmname.c_str());
73 ERecoEventProcessor::~ERecoEventProcessor()
93 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
94 string(m_conf->getconf(
"processor",
"ringbufin"));
96 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
97 string(m_conf->getconf(
"processor",
"ringbufout"));
99 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
103 char* hrecv = m_conf->getconf(
"processor",
"historecv",
"script");
104 char* hport = m_conf->getconf(
"processor",
"historecv",
"port");
105 char* mapfile = m_conf->getconf(
"processor",
"historecv",
"mapfile");
106 m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
111 char* hrelay = m_conf->getconf(
"processor",
"historelay",
"script");
112 char* dqmdest = m_conf->getconf(
"dqmserver",
"host");
113 char* dqmport = m_conf->getconf(
"dqmserver",
"port");
114 char* interval = m_conf->getconf(
"processor",
"historelay",
"interval");
115 m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
128 char* receiver = m_conf->getconf(
"processor",
"receiver",
"script");
129 char* srchost = m_conf->getconf(
"distributor",
"host");
131 int portbase = m_conf->getconfi(
"distributor",
"sender",
"portbase");
134 sscanf(&m_nodename[strlen(m_nodename) - 2],
"%d", &rport);
137 sprintf(portchar,
"%d", rport);
138 m_pid_receiver = m_proc->Execute(receiver, (
char*)rbufin.c_str(), srchost, portchar, (
char*)shmname.c_str(), (
char*)
"0");
141 char* evs = m_conf->getconf(
"processor",
"eventserver",
"script");
142 char* evsport = m_conf->getconf(
"processor",
"eventserver",
"port");
143 m_pid_evs = m_proc->Execute(evs, (
char*)rbufout.c_str(), evsport);
145 printf(
"Configure : done\n");
156 if (m_pid_sender != 0) {
157 printf(
"ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
158 kill(m_pid_sender, SIGINT);
159 waitpid(m_pid_sender, &status, 0);
161 if (m_pid_basf2 != 0) {
162 printf(
"ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
163 kill(m_pid_basf2, SIGINT);
164 waitpid(m_pid_basf2, &status, 0);
166 if (m_pid_receiver != 0) {
167 printf(
"ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
168 kill(m_pid_receiver, SIGINT);
169 waitpid(m_pid_receiver, &status, 0);
171 if (m_pid_hrecv != 0) {
172 printf(
"ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
173 kill(m_pid_hrecv, SIGINT);
174 waitpid(m_pid_hrecv, &status, 0);
176 if (m_pid_hrelay != 0) {
177 printf(
"ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
178 kill(m_pid_hrelay, SIGINT);
179 waitpid(m_pid_hrelay, &status, 0);
181 if (m_pid_evs != 0) {
182 printf(
"ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
183 kill(m_pid_evs, SIGINT);
184 waitpid(m_pid_evs, &status, 0);
188 m_rbufin->forceClear();
189 m_rbufout->forceClear();
192 m_flow->fillProcessStatus(GetNodeInfo());
194 printf(
"Unconfigure : done\n");
202 char cmdline[] =
"hsendcommand DQMRC:CLEAR localhost 10391";
204 printf(
"ERecoEventProcessor : DQM TMemFile cleared\n");
208 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
209 string(m_conf->getconf(
"processor",
"ringbufin"));
210 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
211 string(m_conf->getconf(
"processor",
"ringbufout"));
212 char* hport = m_conf->getconf(
"processor",
"historecv",
"port");
216 char* basf2 = m_conf->getconf(
"processor",
"basf2",
"script");
218 basf2 = (
char*) nsmm->datap;
219 printf(
"Configure: basf2 script overridden : %s\n", basf2);
221 m_pid_basf2 = m_proc->Execute(basf2, (
char*)rbufin.c_str(), (
char*)rbufout.c_str(), hport);
237 if (m_pid_basf2 != 0) {
238 printf(
"RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
239 kill(m_pid_basf2, SIGINT);
240 waitpid(m_pid_basf2, &status, 0);
249 printf(
"ERecoEventProcessor : Restarting!!!!!\n");
277 ERecoEventProcessor::UnConfigure(nsmmsg, nsmcontext);
279 ERecoEventProcessor::Configure(nsmmsg, nsmcontext);
285 void ERecoEventProcessor::server()
288 m_flow->fillProcessStatus(GetNodeInfo());
291 pid_t pid = m_proc->CheckProcess();
293 printf(
"ERecoEventProcessor : process dead pid=%d\n", pid);
294 if (pid == m_pid_sender) {
295 m_log->Fatal(
"ERecoEventProcessor : sender dead. pid=%d\n", m_pid_sender);
297 }
else if (pid == m_pid_basf2) {
298 m_log->Fatal(
"ERecoEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
300 }
else if (pid == m_pid_receiver) {
301 m_log->Fatal(
"ERecoEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
303 }
else if (pid == m_pid_hrecv) {
304 m_log->Fatal(
"ERecoEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
306 }
else if (pid == m_pid_hrelay) {
307 m_log->Fatal(
"ERecoEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
311 int st = m_proc->CheckOutput();
313 perror(
"ERecoEventProcessor::server");
316 m_log->ProcessLog(m_proc->GetFd());
318 m_flow->fillNodeInfo(0, GetNodeInfo(),
false);
319 m_flow->fillNodeInfo(1, GetNodeInfo(),
true);
320 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
321 m_pid_hrecv, m_pid_hrelay);
Class to manage a Ring Buffer placed in an IPC shared memory.
Abstract base class for different kinds of events.