10 #include "daq/expreco/ERecoDistributor.h"
13 #include <sys/types.h>
21 #define RFEVSOUT stdout
26 ERecoDistributor::ERecoDistributor(
string conffile)
29 m_conf =
new RFConf(conffile.c_str());
30 char* nodename = m_conf->getconf(
"distributor",
"nodename");
35 string execdir = string(m_conf->getconf(
"system",
"execdir_base")) +
"/distributor";
37 mkdir(execdir.c_str(), 0755);
38 chdir(execdir.c_str());
41 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
42 string(m_conf->getconf(
"distributor",
"nodename"));
50 string ringbuf = string(m_conf->getconf(
"system",
"unitname")) +
":" +
51 string(m_conf->getconf(
"distributor",
"ringbuffer"));
52 int rbufsize = m_conf->getconfi(
"distributor",
"ringbuffersize");
53 m_rbufin =
new RingBuffer(ringbuf.c_str(), rbufsize);
56 m_log =
new RFLogManager(nodename, m_conf->getconf(
"system",
"lognode"));
59 m_flow =
new RFFlowStat((
char*)shmname.c_str());
63 ERecoDistributor::~ERecoDistributor()
80 string ringbuf = string(m_conf->getconf(
"system",
"unitname")) +
":" +
81 string(m_conf->getconf(
"distributor",
"ringbuffer"));
84 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
85 string(m_conf->getconf(
"distributor",
"nodename"));
89 int maxnodes = m_conf->getconfi(
"processor",
"nnodes");
90 int idbase = m_conf->getconfi(
"processor",
"idbase");
91 char* hostbase = m_conf->getconf(
"processor",
"hostbase");
92 char* badlist = m_conf->getconf(
"processor",
"badlist");
94 char* sender = m_conf->getconf(
"distributor",
"sender",
"script");
95 int portbase = m_conf->getconfi(
"distributor",
"sender",
"portbase");
98 char hostname[512], idname[3], shmid[3];
99 for (
int i = 0; i < maxnodes; i++) {
100 sprintf(idname,
"%2.2d", idbase + i);
101 sprintf(shmid,
"%2.2d", i);
102 if (badlist == NULL ||
103 strstr(badlist, idname) == 0) {
104 int port = (idbase + i) + portbase;
106 sprintf(portchar,
"%d", port);
107 m_pid_sender[m_nnodes] = m_proc->Execute(sender, (
char*)ringbuf.c_str(), portchar, (
char*)shmname.c_str(), shmid);
108 printf(
"Running sender to %d\n", port);
116 int maxrecv = m_conf->getconfi(
"distributor",
"receiver",
"nnodes");
117 int ridbase = m_conf->getconfi(
"distributor",
"receiver",
"idbase");
118 char* rhostbase = m_conf->getconf(
"distributor",
"receiver",
"hostbase");
119 char* rbadlist = m_conf->getconf(
"distributor",
"receiver",
"badlist");
120 char* port = m_conf->getconf(
"distributor",
"receiver",
"port");
121 char* receiver = m_conf->getconf(
"distributor",
"receiver",
"script");
124 for (
int i = 0; i < maxrecv; i++) {
125 sprintf(hostname,
"%s%2.2d", rhostbase, ridbase + i);
126 sprintf(idname,
"%2.2d", ridbase + i);
127 sprintf(shmid,
"%2.2d", i + 20);
128 if (rbadlist == NULL ||
129 strstr(rbadlist, hostname) == 0) {
130 printf(
"Running receiver for %s\n", hostname);
132 m_pid_recv[m_nrecv] = m_proc->Execute(receiver, (
char*)ringbuf.c_str(), hostname, port, (
char*)shmname.c_str(), shmid);
137 printf(
"ERecoDistributor : Configure done\n");
146 printf(
"m_nrecv = %d\n", m_nrecv);
147 for (
int i = 0; i < m_nrecv; i++) {
148 if (m_pid_recv[i] != 0) {
149 printf(
"ERecoDistributor : killing receiver pid=%d\n", m_pid_recv[i]);
150 kill(m_pid_recv[i], SIGKILL);
151 waitpid(m_pid_recv[i], &status, 0);
155 printf(
"m_nnodes = %d\n", m_nnodes);
156 for (
int i = 0; i < m_nnodes; i++) {
157 if (m_pid_sender[i] != 0) {
158 printf(
"ERecoDistributor : killing sender pid=%d\n", m_pid_sender[i]);
160 kill(m_pid_sender[i], SIGKILL);
161 waitpid(m_pid_sender[i], &status, 0);
165 m_rbufin->forceClear();
168 m_flow->fillProcessStatus(GetNodeInfo());
170 printf(
"ERecoDistributor : Unconfigure done\n");
189 printf(
"ERecoDistributor : Restarting!!!!!\n");
192 ERecoDistributor::UnConfigure(nsmmsg, nsmcontext);
194 ERecoDistributor::Configure(nsmmsg, nsmcontext);
200 void ERecoDistributor::server()
203 m_flow->fillProcessStatus(GetNodeInfo());
207 pid_t pid = m_proc->CheckProcess();
209 printf(
"ERecoDistributor : process dead. pid = %d\n", pid);
210 for (
int i = 0; i < m_nrecv; i++) {
211 if (pid == m_pid_recv[i]) {
212 m_log->Fatal(
"ERecoDistributor : receiver process (%d) dead. pid=%d\n", i, pid);
216 for (
int i = 0; i < m_nnodes; i++) {
217 if (pid == m_pid_sender[i]) {
218 m_log->Fatal(
"ERecoDistributor : sender process (%d) dead. pid=%d\n", i, m_pid_sender[i]);
225 int st = m_proc->CheckOutput();
227 perror(
"ERecoDistributor::server");
230 m_log->ProcessLog(m_proc->GetFd());