10 #include "daq/rfarm/manager/RFOutputServer.h"
13 #include <sys/types.h>
21 #define RFOTSOUT stdout
26 RFOutputServer::RFOutputServer(
string conffile)
29 m_conf =
new RFConf(conffile.c_str());
30 char* nodename = m_conf->getconf(
"collector",
"nodename");
35 string execdir = string(m_conf->getconf(
"system",
"execdir_base")) +
"/collector";
37 mkdir(execdir.c_str(), 0755);
38 chdir(execdir.c_str());
41 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
47 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
48 string(m_conf->getconf(
"collector",
"ringbufin"));
49 int rbinsize = m_conf->getconfi(
"collector",
"ringbufinsize");
50 m_rbufin =
new RingBuffer(rbufin.c_str(), rbinsize);
52 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
53 string(m_conf->getconf(
"collector",
"ringbufout"));
54 int rboutsize = m_conf->getconfi(
"collector",
"ringbufoutsize");
55 m_rbufout =
new RingBuffer(rbufout.c_str(), rboutsize);
61 m_log =
new RFLogManager(nodename, m_conf->getconf(
"system",
"lognode"));
64 m_flow =
new RFFlowStat((
char*)shmname.c_str());
69 m_nnodes = m_conf->getconfi(
"processor",
"nnodes");
70 for (
int i = 0; i < m_nnodes; i++)
71 m_pid_receiver[i] = 0;
75 RFOutputServer::~RFOutputServer()
95 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
96 string(m_conf->getconf(
"collector",
"ringbufin"));
98 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
99 string(m_conf->getconf(
"collector",
"ringbufout"));
102 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
103 string(m_conf->getconf(
"collector",
"nodename"));
106 char* hrecv = m_conf->getconf(
"collector",
"historecv",
"script");
107 char* hport = m_conf->getconf(
"collector",
"historecv",
"port");
108 char* mapfile = m_conf->getconf(
"collector",
"historecv",
"mapfile");
112 char* hrelay = m_conf->getconf(
"collector",
"historelay",
"script");
113 char* dqmdest = m_conf->getconf(
"dqmserver",
"host");
114 char* dqmport = m_conf->getconf(
"dqmserver",
"port");
115 char* interval = m_conf->getconf(
"collector",
"historelay",
"interval");
119 char* src = m_conf->getconf(
"collector",
"destination");
120 if (strstr(src,
"net") != 0) {
122 char* sender = m_conf->getconf(
"collector",
"sender",
"script");
123 char* port = m_conf->getconf(
"collector",
"sender",
"port");
125 sprintf(idbuf,
"%2.2d", RF_OUTPUT_ID);
126 m_pid_sender = m_proc->Execute(sender, (
char*)rbufout.c_str(), port, (
char*)shmname.c_str(), idbuf);
127 m_flow->clear(RF_OUTPUT_ID);
128 }
else if (strstr(src,
"file") != 0) {
130 char* writer = m_conf->getconf(
"collector",
"writer",
"script");
131 char* file = m_conf->getconf(
"collector",
"writer",
"filename");
132 char* nnode = m_conf->getconf(
"processor",
"nnodes");
133 m_pid_sender = m_proc->Execute(writer, (
char*)rbufout.c_str(), file, nnode);
139 char* basf2 = m_conf->getconf(
"collector",
"basf2",
"script");
141 basf2 = (
char*) nsmm->datap;
142 printf(
"Configure: basf2 script overridden : %s\n", basf2);
144 m_pid_basf2 = m_proc->Execute(basf2, (
char*)rbufin.c_str(), (
char*)rbufout.c_str(), hport);
148 int maxnodes = m_conf->getconfi(
"processor",
"nnodes");
149 int idbase = m_conf->getconfi(
"processor",
"idbase");
151 char* hostbase = m_conf->getconf(
"processor",
"hostbase");
152 char* badlist = m_conf->getconf(
"processor",
"badlist");
153 char* port = m_conf->getconf(
"processor",
"sender",
"port");
155 char* receiver = m_conf->getconf(
"collector",
"receiver",
"script");
156 char hostname[512], idname[3], shmid[3];
157 for (
int i = 0; i < maxnodes; i++) {
158 sprintf(idname,
"%2.2d", idbase + i);
159 sprintf(shmid,
"%2.2d", i);
160 if (badlist == NULL ||
161 strstr(badlist, idname) == 0) {
162 sprintf(hostname,
"%s%2.2d", hostbase, idbase + i);
163 m_pid_receiver[m_nnodes] = m_proc->Execute(receiver, (
char*)rbufin.c_str(), hostname, port, (
char*)shmname.c_str(), shmid);
169 m_rbufin->forceClear();
170 m_rbufout->forceClear();
178 printf(
"m_pid_sender = %d\n", m_pid_sender);
179 printf(
"m_pid_basf2 = %d\n", m_pid_basf2);
182 if (m_pid_sender != 0) {
183 printf(
"killing sender %d\n", m_pid_sender);
185 kill(m_pid_sender, SIGINT);
186 ws = waitpid(m_pid_sender, &status, 0);
187 printf(
"wait return = %d, status = %d\n", ws, status);
190 if (m_pid_basf2 != 0) {
191 printf(
"killing sender %d\n", m_pid_sender);
192 kill(m_pid_basf2, SIGINT);
193 ws = waitpid(m_pid_basf2, &status, 0);
194 printf(
"wait return = %d, status = %d\n", ws, status);
197 for (
int i = 0; i < m_nnodes; i++) {
198 if (m_pid_receiver[i] != 0) {
199 printf(
"killing receiver %d\n", m_pid_receiver[i]);
200 kill(m_pid_receiver[i], SIGINT);
201 ws = waitpid(m_pid_receiver[i], &status, 0);
202 printf(
"wait return = %d, status = %d\n", ws, status);
207 m_rbufin->forceClear();
208 m_rbufout->forceClear();
211 m_flow->fillProcessStatus(GetNodeInfo());
213 printf(
"Unconfigure done\n");
221 m_rbufout->forceClear();
234 printf(
"RFOutputServer : Restarting!!!!!!\n");
247 RFOutputServer::UnConfigure(nsmmsg, nsmcontext);
249 RFOutputServer::Configure(nsmmsg, nsmcontext);
255 void RFOutputServer::server()
257 m_flow->fillProcessStatus(GetNodeInfo());
260 pid_t pid = m_proc->CheckProcess();
262 printf(
"RFOutputServer : process dead. pid=%d\n", pid);
263 if (pid == m_pid_sender) {
264 m_log->Fatal(
"RFOutputServer : sender process dead. pid=%d\n", pid);
266 }
else if (pid == m_pid_basf2) {
267 m_log->Fatal(
"RFOutputServer : basf2 process dead. pid=%d\n", pid);
270 for (
int i = 0; i < m_nnodes; i++) {
271 if (pid == m_pid_receiver[i]) {
272 m_log->Fatal(
"RFOutputServer : receiver process %d dead. pid=%d\n", i, pid);
273 m_pid_receiver[i] = 0;
280 int st = m_proc->CheckOutput();
282 perror(
"RFOutputServer::server");
285 m_log->ProcessLog(m_proc->GetFd());
287 m_flow->fillNodeInfo(RF_OUTPUT_ID, GetNodeInfo(),
true);
288 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver[recv_id], m_pid_sender,
292 void RFOutputServer::cleanup()
294 printf(
"RFOutputServer : cleaning up\n");
295 UnConfigure(NULL, NULL);
296 printf(
"RFOutputServer: Done. Exitting\n");