10 #include "daq/expreco/ERecoEventProcessor.h"
14 #include <sys/types.h>
21 #define RFOTSOUT stdout
26 ERecoEventProcessor::ERecoEventProcessor(
string conffile)
29 m_conf =
new RFConf(conffile.c_str());
32 strcpy(m_nodename,
"evp_");
34 gethostname(&m_nodename[4],
sizeof(m_nodename));
35 printf(
"nodename = %s\n", m_nodename);
38 string execdir = string(m_conf->getconf(
"system",
"execdir_base")) +
"/" + string(m_nodename);
39 printf(
"execdir = %s\n", execdir.c_str());
41 mkdir(execdir.c_str(), 0755);
42 chdir(execdir.c_str());
45 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
54 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
55 string(m_conf->getconf(
"processor",
"ringbufin"));
56 int rbinsize = m_conf->getconfi(
"processor",
"ringbufinsize");
58 m_rbufin =
new RingBuffer(rbufin.c_str(), rbinsize);
60 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
61 string(m_conf->getconf(
"processor",
"ringbufout"));
62 int rboutsize = m_conf->getconfi(
"processor",
"ringbufoutsize");
64 m_rbufout =
new RingBuffer(rbufout.c_str(), rboutsize);
67 m_log =
new RFLogManager(m_nodename, m_conf->getconf(
"system",
"lognode"));
70 m_flow =
new RFFlowStat((
char*)shmname.c_str());
74 ERecoEventProcessor::~ERecoEventProcessor()
94 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
95 string(m_conf->getconf(
"processor",
"ringbufin"));
97 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
98 string(m_conf->getconf(
"processor",
"ringbufout"));
100 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
104 char* hrecv = m_conf->getconf(
"processor",
"historecv",
"script");
105 char* hport = m_conf->getconf(
"processor",
"historecv",
"port");
106 char* mapfile = m_conf->getconf(
"processor",
"historecv",
"mapfile");
107 m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
112 char* hrelay = m_conf->getconf(
"processor",
"historelay",
"script");
113 char* dqmdest = m_conf->getconf(
"dqmserver",
"host");
114 char* dqmport = m_conf->getconf(
"dqmserver",
"port");
115 char* interval = m_conf->getconf(
"processor",
"historelay",
"interval");
116 m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
129 char* receiver = m_conf->getconf(
"processor",
"receiver",
"script");
130 char* srchost = m_conf->getconf(
"distributor",
"host");
132 int portbase = m_conf->getconfi(
"distributor",
"sender",
"portbase");
133 char* hostbase = m_conf->getconf(
"processor",
"nodebase");
134 int baselen = strlen(hostbase);
136 sscanf(&m_nodename[strlen(m_nodename) - 2],
"%d", &rport);
139 sprintf(portchar,
"%d", rport);
140 m_pid_receiver = m_proc->Execute(receiver, (
char*)rbufin.c_str(), srchost, portchar, (
char*)shmname.c_str(), (
char*)
"0");
143 char* evs = m_conf->getconf(
"processor",
"eventserver",
"script");
144 char* evsport = m_conf->getconf(
"processor",
"eventserver",
"port");
145 m_pid_evs = m_proc->Execute(evs, (
char*)rbufout.c_str(), evsport);
147 printf(
"Configure : done\n");
158 if (m_pid_sender != 0) {
159 printf(
"ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
160 kill(m_pid_sender, SIGINT);
161 waitpid(m_pid_sender, &status, 0);
163 if (m_pid_basf2 != 0) {
164 printf(
"ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
165 kill(m_pid_basf2, SIGINT);
166 waitpid(m_pid_basf2, &status, 0);
168 if (m_pid_receiver != 0) {
169 printf(
"ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
170 kill(m_pid_receiver, SIGINT);
171 waitpid(m_pid_receiver, &status, 0);
173 if (m_pid_hrecv != 0) {
174 printf(
"ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
175 kill(m_pid_hrecv, SIGINT);
176 waitpid(m_pid_hrecv, &status, 0);
178 if (m_pid_hrelay != 0) {
179 printf(
"ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
180 kill(m_pid_hrelay, SIGINT);
181 waitpid(m_pid_hrelay, &status, 0);
183 if (m_pid_evs != 0) {
184 printf(
"ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
185 kill(m_pid_evs, SIGINT);
186 waitpid(m_pid_evs, &status, 0);
190 m_rbufin->forceClear();
191 m_rbufout->forceClear();
194 m_flow->fillProcessStatus(GetNodeInfo());
196 printf(
"Unconfigure : done\n");
203 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
204 string(m_conf->getconf(
"processor",
"ringbufin"));
205 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
206 string(m_conf->getconf(
"processor",
"ringbufout"));
207 char* hport = m_conf->getconf(
"processor",
"historecv",
"port");
211 char* basf2 = m_conf->getconf(
"processor",
"basf2",
"script");
213 basf2 = (
char*) nsmm->datap;
214 printf(
"Configure: basf2 script overridden : %s\n", basf2);
216 m_pid_basf2 = m_proc->Execute(basf2, (
char*)rbufin.c_str(), (
char*)rbufout.c_str(), hport);
232 if (m_pid_basf2 != 0) {
233 printf(
"RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
234 kill(m_pid_basf2, SIGINT);
235 waitpid(m_pid_basf2, &status, 0);
244 printf(
"ERecoEventProcessor : Restarting!!!!!\n");
272 ERecoEventProcessor::UnConfigure(nsmmsg, nsmcontext);
274 ERecoEventProcessor::Configure(nsmmsg, nsmcontext);
280 void ERecoEventProcessor::server()
283 m_flow->fillProcessStatus(GetNodeInfo());
286 pid_t pid = m_proc->CheckProcess();
288 printf(
"ERecoEventProcessor : process dead pid=%d\n", pid);
289 if (pid == m_pid_sender) {
290 m_log->Fatal(
"ERecoEventProcessor : sender dead. pid=%d\n", m_pid_sender);
292 }
else if (pid == m_pid_basf2) {
293 m_log->Fatal(
"ERecoEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
295 }
else if (pid == m_pid_receiver) {
296 m_log->Fatal(
"ERecoEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
298 }
else if (pid == m_pid_hrecv) {
299 m_log->Fatal(
"ERecoEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
301 }
else if (pid == m_pid_hrelay) {
302 m_log->Fatal(
"ERecoEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
306 int st = m_proc->CheckOutput();
308 perror(
"ERecoEventProcessor::server");
311 m_log->ProcessLog(m_proc->GetFd());
313 m_flow->fillNodeInfo(0, GetNodeInfo(),
false);
314 m_flow->fillNodeInfo(1, GetNodeInfo(),
true);
315 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
316 m_pid_hrecv, m_pid_hrelay);