10 #include "daq/rfarm/manager/RFEventServer.h"
13 #include <sys/types.h>
21 #define RFEVSOUT stdout
29 RFEventServer::RFEventServer(
string conffile)
32 m_conf =
new RFConf(conffile.c_str());
33 char* nodename = m_conf->getconf(
"distributor",
"nodename");
38 string execdir = string(m_conf->getconf(
"system",
"execdir_base")) +
"/distributor";
40 mkdir(execdir.c_str(), 0755);
41 chdir(execdir.c_str());
44 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
45 string(m_conf->getconf(
"distributor",
"nodename"));
53 string ringbuf = string(m_conf->getconf(
"system",
"unitname")) +
":" +
54 string(m_conf->getconf(
"distributor",
"ringbuffer"));
55 int rbufsize = m_conf->getconfi(
"distributor",
"ringbuffersize");
56 m_rbufin =
new RingBuffer(ringbuf.c_str(), rbufsize);
59 m_log =
new RFLogManager(nodename, m_conf->getconf(
"system",
"lognode"));
62 m_flow =
new RFFlowStat((
char*)shmname.c_str());
66 for (
int i = 0; i < m_nnodes; i++)
71 RFEventServer::~RFEventServer()
102 string ringbuf = string(m_conf->getconf(
"system",
"unitname")) +
":" +
103 string(m_conf->getconf(
"distributor",
"ringbuffer"));
106 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
107 string(m_conf->getconf(
"distributor",
"nodename"));
111 int maxnodes = m_conf->getconfi(
"processor",
"nnodes");
112 int idbase = m_conf->getconfi(
"processor",
"idbase");
113 char* hostbase = m_conf->getconf(
"processor",
"hostbase");
114 char* badlist = m_conf->getconf(
"processor",
"badlist");
116 char* sender = m_conf->getconf(
"distributor",
"sender",
"script");
117 int portbase = m_conf->getconfi(
"distributor",
"sender",
"portbase");
119 char hostname[512], idname[3], shmid[3];
120 for (
int i = 0; i < maxnodes; i++) {
121 sprintf(idname,
"%2.2d", idbase + i);
122 sprintf(shmid,
"%2.2d", i);
123 if (badlist == NULL ||
124 strstr(badlist, idname) == 0) {
125 int port = (idbase + i) + portbase;
127 sprintf(portchar,
"%d", port);
128 m_pid_sender[m_nnodes] = m_proc->Execute(sender, (
char*)ringbuf.c_str(), portchar, (
char*)shmname.c_str(), shmid);
135 char* src = m_conf->getconf(
"distributor",
"source");
136 if (strstr(src,
"net") != 0) {
138 char* receiver = m_conf->getconf(
"distributor",
"receiver",
"script");
139 char* src = m_conf->getconf(
"distributor",
"receiver",
"host");
140 char* port = m_conf->getconf(
"distributor",
"receiver",
"port");
143 sprintf(idbuf,
"%2.2d", RF_INPUT_ID);
144 m_pid_recv = m_proc->Execute(receiver, (
char*)ringbuf.c_str(), src, port, (
char*)shmname.c_str(), idbuf);
145 m_flow->clear(RF_INPUT_ID);
146 }
else if (strstr(src,
"file") != 0) {
148 char* filein = m_conf->getconf(
"distributor",
"fileinput",
"script");
149 char* file = m_conf->getconf(
"distributor",
"fileinput",
"filename");
151 char* nnodechr = m_conf->getconf(
"distributor",
"nnodes");
152 m_pid_recv = m_proc->Execute(filein, (
char*)ringbuf.c_str(), file, nnodechr);
155 m_rbufin->forceClear();
165 if (m_pid_recv != 0) {
166 kill(m_pid_recv, SIGINT);
167 waitpid(m_pid_recv, &status, 0);
169 for (
int i = 0; i < m_nnodes; i++) {
170 if (m_pid_sender[i] != 0) {
171 printf(
"RFEventServer:: killing sender pid=%d\n", m_pid_sender[i]);
173 kill(m_pid_sender[i], SIGINT);
174 waitpid(m_pid_sender[i], &status, 0);
178 m_rbufin->forceClear();
181 m_flow->fillProcessStatus(GetNodeInfo());
183 printf(
"Unconfigure : done\n");
203 printf(
"RFEventServer : Restarting!!!!!\n");
220 RFEventServer::UnConfigure(nsmmsg, nsmcontext);
222 RFEventServer::Configure(nsmmsg, nsmcontext);
228 void RFEventServer::server()
231 m_flow->fillProcessStatus(GetNodeInfo());
235 pid_t pid = m_proc->CheckProcess();
237 printf(
"RFEventServer : process dead. pid = %d\n", pid);
238 if (pid == m_pid_recv) {
239 m_log->Fatal(
"RFEventServer : receiver process dead. pid=%d\n", pid);
242 for (
int i = 0; i < m_nnodes; i++) {
243 if (pid == m_pid_sender[i]) {
244 m_log->Fatal(
"RFEventServer : sender process (%d) dead. pid=%d\n", i, m_pid_sender[i]);
251 int st = m_proc->CheckOutput();
253 perror(
"RFEventServer::server");
256 m_log->ProcessLog(m_proc->GetFd());
258 m_flow->fillNodeInfo(RF_INPUT_ID, GetNodeInfo(),
false);
259 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_recv, m_pid_sender[sender_id]);
269 void RFEventServer::cleanup()
271 printf(
"RFEventServer : cleaning up\n");
272 UnConfigure(NULL, NULL);
284 printf(
"RFEventServer: Done. Exitting\n");