9 #include "daq/expreco/ERecoEventProcessor.h"
13 #include <sys/types.h>
20 #define RFOTSOUT stdout
25 ERecoEventProcessor::ERecoEventProcessor(
string conffile)
28 m_conf =
new RFConf(conffile.c_str());
31 strcpy(m_nodename,
"evp_");
33 gethostname(&m_nodename[4],
sizeof(m_nodename));
34 printf(
"nodename = %s\n", m_nodename);
37 string execdir = string(m_conf->getconf(
"system",
"execdir_base")) +
"/" + string(m_nodename);
38 printf(
"execdir = %s\n", execdir.c_str());
40 mkdir(execdir.c_str(), 0755);
41 chdir(execdir.c_str());
44 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
53 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
54 string(m_conf->getconf(
"processor",
"ringbufin"));
55 int rbinsize = m_conf->getconfi(
"processor",
"ringbufinsize");
57 m_rbufin =
new RingBuffer(rbufin.c_str(), rbinsize);
59 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
60 string(m_conf->getconf(
"processor",
"ringbufout"));
61 int rboutsize = m_conf->getconfi(
"processor",
"ringbufoutsize");
63 m_rbufout =
new RingBuffer(rbufout.c_str(), rboutsize);
66 m_log =
new RFLogManager(m_nodename, m_conf->getconf(
"system",
"lognode"));
69 m_flow =
new RFFlowStat((
char*)shmname.c_str());
73 ERecoEventProcessor::~ERecoEventProcessor()
93 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
94 string(m_conf->getconf(
"processor",
"ringbufin"));
96 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
97 string(m_conf->getconf(
"processor",
"ringbufout"));
99 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
103 char* hrecv = m_conf->getconf(
"processor",
"historecv",
"script");
104 char* hport = m_conf->getconf(
"processor",
"historecv",
"port");
105 char* mapfile = m_conf->getconf(
"processor",
"historecv",
"mapfile");
106 m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
111 char* hrelay = m_conf->getconf(
"processor",
"historelay",
"script");
112 char* dqmdest = m_conf->getconf(
"dqmserver",
"host");
113 char* dqmport = m_conf->getconf(
"dqmserver",
"port");
114 char* interval = m_conf->getconf(
"processor",
"historelay",
"interval");
115 m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
128 char* receiver = m_conf->getconf(
"processor",
"receiver",
"script");
129 char* srchost = m_conf->getconf(
"distributor",
"host");
131 int portbase = m_conf->getconfi(
"distributor",
"sender",
"portbase");
132 char* hostbase = m_conf->getconf(
"processor",
"nodebase");
133 int baselen = strlen(hostbase);
135 sscanf(&m_nodename[strlen(m_nodename) - 2],
"%d", &rport);
138 sprintf(portchar,
"%d", rport);
139 m_pid_receiver = m_proc->Execute(receiver, (
char*)rbufin.c_str(), srchost, portchar, (
char*)shmname.c_str(), (
char*)
"0");
142 char* evs = m_conf->getconf(
"processor",
"eventserver",
"script");
143 char* evsport = m_conf->getconf(
"processor",
"eventserver",
"port");
144 m_pid_evs = m_proc->Execute(evs, (
char*)rbufout.c_str(), evsport);
146 printf(
"Configure : done\n");
157 if (m_pid_sender != 0) {
158 printf(
"ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
159 kill(m_pid_sender, SIGINT);
160 waitpid(m_pid_sender, &status, 0);
162 if (m_pid_basf2 != 0) {
163 printf(
"ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
164 kill(m_pid_basf2, SIGINT);
165 waitpid(m_pid_basf2, &status, 0);
167 if (m_pid_receiver != 0) {
168 printf(
"ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
169 kill(m_pid_receiver, SIGINT);
170 waitpid(m_pid_receiver, &status, 0);
172 if (m_pid_hrecv != 0) {
173 printf(
"ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
174 kill(m_pid_hrecv, SIGINT);
175 waitpid(m_pid_hrecv, &status, 0);
177 if (m_pid_hrelay != 0) {
178 printf(
"ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
179 kill(m_pid_hrelay, SIGINT);
180 waitpid(m_pid_hrelay, &status, 0);
182 if (m_pid_evs != 0) {
183 printf(
"ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
184 kill(m_pid_evs, SIGINT);
185 waitpid(m_pid_evs, &status, 0);
189 m_rbufin->forceClear();
190 m_rbufout->forceClear();
193 m_flow->fillProcessStatus(GetNodeInfo());
195 printf(
"Unconfigure : done\n");
203 char cmdline[] =
"hsendcommand DQMRC:CLEAR localhost 10391";
205 printf(
"ERecoEventProcessor : DQM TMemFile cleared\n");
209 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
210 string(m_conf->getconf(
"processor",
"ringbufin"));
211 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
212 string(m_conf->getconf(
"processor",
"ringbufout"));
213 char* hport = m_conf->getconf(
"processor",
"historecv",
"port");
217 char* basf2 = m_conf->getconf(
"processor",
"basf2",
"script");
219 basf2 = (
char*) nsmm->datap;
220 printf(
"Configure: basf2 script overridden : %s\n", basf2);
222 m_pid_basf2 = m_proc->Execute(basf2, (
char*)rbufin.c_str(), (
char*)rbufout.c_str(), hport);
238 if (m_pid_basf2 != 0) {
239 printf(
"RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
240 kill(m_pid_basf2, SIGINT);
241 waitpid(m_pid_basf2, &status, 0);
250 printf(
"ERecoEventProcessor : Restarting!!!!!\n");
278 ERecoEventProcessor::UnConfigure(nsmmsg, nsmcontext);
280 ERecoEventProcessor::Configure(nsmmsg, nsmcontext);
286 void ERecoEventProcessor::server()
289 m_flow->fillProcessStatus(GetNodeInfo());
292 pid_t pid = m_proc->CheckProcess();
294 printf(
"ERecoEventProcessor : process dead pid=%d\n", pid);
295 if (pid == m_pid_sender) {
296 m_log->Fatal(
"ERecoEventProcessor : sender dead. pid=%d\n", m_pid_sender);
298 }
else if (pid == m_pid_basf2) {
299 m_log->Fatal(
"ERecoEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
301 }
else if (pid == m_pid_receiver) {
302 m_log->Fatal(
"ERecoEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
304 }
else if (pid == m_pid_hrecv) {
305 m_log->Fatal(
"ERecoEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
307 }
else if (pid == m_pid_hrelay) {
308 m_log->Fatal(
"ERecoEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
312 int st = m_proc->CheckOutput();
314 perror(
"ERecoEventProcessor::server");
317 m_log->ProcessLog(m_proc->GetFd());
319 m_flow->fillNodeInfo(0, GetNodeInfo(),
false);
320 m_flow->fillNodeInfo(1, GetNodeInfo(),
true);
321 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
322 m_pid_hrecv, m_pid_hrelay);
Class to manage a Ring Buffer placed in an IPC shared memory.
Abstract base class for different kinds of events.