9#include "daq/rfarm/manager/RFEventServer.h"
20#define RFEVSOUT stdout
28RFEventServer::RFEventServer(
string conffile)
31 m_conf =
new RFConf(conffile.c_str());
32 char* nodename = m_conf->getconf(
"distributor",
"nodename");
37 string execdir = string(m_conf->getconf(
"system",
"execdir_base")) +
"/distributor";
39 mkdir(execdir.c_str(), 0755);
40 chdir(execdir.c_str());
43 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
44 string(m_conf->getconf(
"distributor",
"nodename"));
52 string ringbuf = string(m_conf->getconf(
"system",
"unitname")) +
":" +
53 string(m_conf->getconf(
"distributor",
"ringbuffer"));
54 int rbufsize = m_conf->getconfi(
"distributor",
"ringbuffersize");
55 m_rbufin =
new RingBuffer(ringbuf.c_str(), rbufsize);
58 m_log =
new RFLogManager(nodename, m_conf->getconf(
"system",
"lognode"));
61 m_flow =
new RFFlowStat((
char*)shmname.c_str());
65 for (
int i = 0; i < m_nnodes; i++)
70RFEventServer::~RFEventServer()
101 string ringbuf = string(m_conf->getconf(
"system",
"unitname")) +
":" +
102 string(m_conf->getconf(
"distributor",
"ringbuffer"));
105 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
106 string(m_conf->getconf(
"distributor",
"nodename"));
110 int maxnodes = m_conf->getconfi(
"processor",
"nnodes");
111 int idbase = m_conf->getconfi(
"processor",
"idbase");
113 char* badlist = m_conf->getconf(
"processor",
"badlist");
115 char* sender = m_conf->getconf(
"distributor",
"sender",
"script");
116 int portbase = m_conf->getconfi(
"distributor",
"sender",
"portbase");
118 char idname[3], shmid[3];
119 for (
int i = 0; i < maxnodes; i++) {
120 sprintf(idname,
"%2.2d", idbase + i);
121 sprintf(shmid,
"%2.2d", i);
122 if (badlist == NULL ||
123 strstr(badlist, idname) == 0) {
124 int port = (idbase + i) + portbase;
126 sprintf(portchar,
"%d", port);
127 m_pid_sender[m_nnodes] = m_proc->
Execute(sender, (
char*)ringbuf.c_str(), portchar, (
char*)shmname.c_str(), shmid);
134 char* srcG = m_conf->getconf(
"distributor",
"source");
135 if (strstr(srcG,
"net") != 0) {
137 char* receiver = m_conf->getconf(
"distributor",
"receiver",
"script");
138 char* src = m_conf->getconf(
"distributor",
"receiver",
"host");
139 char* port = m_conf->getconf(
"distributor",
"receiver",
"port");
142 sprintf(idbuf,
"%2.2d", RF_INPUT_ID);
143 m_pid_recv = m_proc->
Execute(receiver, (
char*)ringbuf.c_str(), src, port, (
char*)shmname.c_str(), idbuf);
144 m_flow->clear(RF_INPUT_ID);
145 }
else if (strstr(srcG,
"file") != 0) {
147 char* filein = m_conf->getconf(
"distributor",
"fileinput",
"script");
148 char* file = m_conf->getconf(
"distributor",
"fileinput",
"filename");
150 char* nnodechr = m_conf->getconf(
"distributor",
"nnodes");
151 m_pid_recv = m_proc->
Execute(filein, (
char*)ringbuf.c_str(), file, nnodechr);
164 if (m_pid_recv != 0) {
165 kill(m_pid_recv, SIGINT);
166 waitpid(m_pid_recv, &status, 0);
168 for (
int i = 0; i < m_nnodes; i++) {
169 if (m_pid_sender[i] != 0) {
170 printf(
"RFEventServer:: killing sender pid=%d\n", m_pid_sender[i]);
172 kill(m_pid_sender[i], SIGINT);
173 waitpid(m_pid_sender[i], &status, 0);
180 m_flow->fillProcessStatus(GetNodeInfo());
182 printf(
"Unconfigure : done\n");
202 printf(
"RFEventServer : Restarting!!!!!\n");
219 RFEventServer::UnConfigure(nsmmsg, nsmcontext);
221 RFEventServer::Configure(nsmmsg, nsmcontext);
227void RFEventServer::server()
230 m_flow->fillProcessStatus(GetNodeInfo());
234 pid_t pid = m_proc->CheckProcess();
236 printf(
"RFEventServer : process dead. pid = %d\n", pid);
237 if (pid == m_pid_recv) {
238 m_log->Fatal(
"RFEventServer : receiver process dead. pid=%d\n", pid);
241 for (
int i = 0; i < m_nnodes; i++) {
242 if (pid == m_pid_sender[i]) {
243 m_log->Fatal(
"RFEventServer : sender process (%d) dead. pid=%d\n", i, m_pid_sender[i]);
250 int st = m_proc->CheckOutput();
252 perror(
"RFEventServer::server");
255 m_log->ProcessLog(m_proc->GetFd());
257 m_flow->fillNodeInfo(RF_INPUT_ID, GetNodeInfo(),
false);
258 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_recv, m_pid_sender[sender_id]);
268void RFEventServer::cleanup()
270 printf(
"RFEventServer : cleaning up\n");
271 UnConfigure(NULL, NULL);
283 printf(
"RFEventServer: Done. Exitting\n");
int Execute(char *script, int nargs, char **args)
Class to manage a Ring Buffer placed in an IPC shared memory.
int clear()
Clear the RingBuffer.
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Abstract base class for different kinds of events.