9#include "daq/rfarm/manager/RFOutputServer.h"
20#define RFOTSOUT stdout
25RFOutputServer::RFOutputServer(
string conffile)
28 m_conf =
new RFConf(conffile.c_str());
29 char* nodename = m_conf->getconf(
"collector",
"nodename");
34 string execdir = string(m_conf->getconf(
"system",
"execdir_base")) +
"/collector";
36 mkdir(execdir.c_str(), 0755);
37 chdir(execdir.c_str());
40 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
46 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
47 string(m_conf->getconf(
"collector",
"ringbufin"));
48 int rbinsize = m_conf->getconfi(
"collector",
"ringbufinsize");
49 m_rbufin =
new RingBuffer(rbufin.c_str(), rbinsize);
51 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
52 string(m_conf->getconf(
"collector",
"ringbufout"));
53 int rboutsize = m_conf->getconfi(
"collector",
"ringbufoutsize");
54 m_rbufout =
new RingBuffer(rbufout.c_str(), rboutsize);
60 m_log =
new RFLogManager(nodename, m_conf->getconf(
"system",
"lognode"));
63 m_flow =
new RFFlowStat((
char*)shmname.c_str());
68 m_nnodes = m_conf->getconfi(
"processor",
"nnodes");
69 for (
int i = 0; i < m_nnodes; i++)
70 m_pid_receiver[i] = 0;
74RFOutputServer::~RFOutputServer()
94 string rbufin = string(m_conf->getconf(
"system",
"unitname")) +
":" +
95 string(m_conf->getconf(
"collector",
"ringbufin"));
97 string rbufout = string(m_conf->getconf(
"system",
"unitname")) +
":" +
98 string(m_conf->getconf(
"collector",
"ringbufout"));
101 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
102 string(m_conf->getconf(
"collector",
"nodename"));
106 char* hport = m_conf->getconf(
"collector",
"historecv",
"port");
118 char* src = m_conf->getconf(
"collector",
"destination");
119 if (strstr(src,
"net") != 0) {
121 char* sender = m_conf->getconf(
"collector",
"sender",
"script");
122 char* port = m_conf->getconf(
"collector",
"sender",
"port");
124 sprintf(idbuf,
"%2.2d", RF_OUTPUT_ID);
125 m_pid_sender = m_proc->
Execute(sender, (
char*)rbufout.c_str(), port, (
char*)shmname.c_str(), idbuf);
126 m_flow->clear(RF_OUTPUT_ID);
127 }
else if (strstr(src,
"file") != 0) {
129 char* writer = m_conf->getconf(
"collector",
"writer",
"script");
130 char* file = m_conf->getconf(
"collector",
"writer",
"filename");
131 char* nnode = m_conf->getconf(
"processor",
"nnodes");
132 m_pid_sender = m_proc->
Execute(writer, (
char*)rbufout.c_str(), file, nnode);
138 char* basf2 = m_conf->getconf(
"collector",
"basf2",
"script");
140 basf2 = (
char*) nsmm->datap;
141 printf(
"Configure: basf2 script overridden : %s\n", basf2);
143 m_pid_basf2 = m_proc->
Execute(basf2, (
char*)rbufin.c_str(), (
char*)rbufout.c_str(), hport);
147 int maxnodes = m_conf->getconfi(
"processor",
"nnodes");
148 int idbase = m_conf->getconfi(
"processor",
"idbase");
150 char* hostbase = m_conf->getconf(
"processor",
"hostbase");
151 char* badlist = m_conf->getconf(
"processor",
"badlist");
152 char* port = m_conf->getconf(
"processor",
"sender",
"port");
154 char* receiver = m_conf->getconf(
"collector",
"receiver",
"script");
155 char hostname[512], idname[3], shmid[3];
156 for (
int i = 0; i < maxnodes; i++) {
157 sprintf(idname,
"%2.2d", idbase + i);
158 sprintf(shmid,
"%2.2d", i);
159 if (badlist == NULL ||
160 strstr(badlist, idname) == 0) {
161 sprintf(hostname,
"%s%2.2d", hostbase, idbase + i);
162 m_pid_receiver[m_nnodes] = m_proc->
Execute(receiver, (
char*)rbufin.c_str(), hostname, port, (
char*)shmname.c_str(), shmid);
177 printf(
"m_pid_sender = %d\n", m_pid_sender);
178 printf(
"m_pid_basf2 = %d\n", m_pid_basf2);
181 if (m_pid_sender != 0) {
182 printf(
"killing sender %d\n", m_pid_sender);
184 kill(m_pid_sender, SIGINT);
185 ws = waitpid(m_pid_sender, &status, 0);
186 printf(
"wait return = %d, status = %d\n", ws, status);
189 if (m_pid_basf2 != 0) {
190 printf(
"killing sender %d\n", m_pid_sender);
191 kill(m_pid_basf2, SIGINT);
192 ws = waitpid(m_pid_basf2, &status, 0);
193 printf(
"wait return = %d, status = %d\n", ws, status);
196 for (
int i = 0; i < m_nnodes; i++) {
197 if (m_pid_receiver[i] != 0) {
198 printf(
"killing receiver %d\n", m_pid_receiver[i]);
199 kill(m_pid_receiver[i], SIGINT);
200 ws = waitpid(m_pid_receiver[i], &status, 0);
201 printf(
"wait return = %d, status = %d\n", ws, status);
210 m_flow->fillProcessStatus(GetNodeInfo());
212 printf(
"Unconfigure done\n");
233 printf(
"RFOutputServer : Restarting!!!!!!\n");
246 RFOutputServer::UnConfigure(nsmmsg, nsmcontext);
248 RFOutputServer::Configure(nsmmsg, nsmcontext);
254void RFOutputServer::server()
256 m_flow->fillProcessStatus(GetNodeInfo());
259 pid_t pid = m_proc->CheckProcess();
261 printf(
"RFOutputServer : process dead. pid=%d\n", pid);
262 if (pid == m_pid_sender) {
263 m_log->Fatal(
"RFOutputServer : sender process dead. pid=%d\n", pid);
265 }
else if (pid == m_pid_basf2) {
266 m_log->Fatal(
"RFOutputServer : basf2 process dead. pid=%d\n", pid);
269 for (
int i = 0; i < m_nnodes; i++) {
270 if (pid == m_pid_receiver[i]) {
271 m_log->Fatal(
"RFOutputServer : receiver process %d dead. pid=%d\n", i, pid);
272 m_pid_receiver[i] = 0;
279 int st = m_proc->CheckOutput();
281 perror(
"RFOutputServer::server");
284 m_log->ProcessLog(m_proc->GetFd());
286 m_flow->fillNodeInfo(RF_OUTPUT_ID, GetNodeInfo(),
true);
287 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver[recv_id], m_pid_sender,
291void RFOutputServer::cleanup()
293 printf(
"RFOutputServer : cleaning up\n");
294 UnConfigure(NULL, NULL);
295 printf(
"RFOutputServer: Done. Exitting\n");
int Execute(char *script, int nargs, char **args)
Class to manage a Ring Buffer placed in an IPC shared memory.
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Abstract base class for different kinds of events.