9#include "daq/expreco/ERecoDistributor.h"
20#define RFEVSOUT stdout
25ERecoDistributor::ERecoDistributor(
string conffile)
28 m_conf =
new RFConf(conffile.c_str());
29 char* nodename = m_conf->getconf(
"distributor",
"nodename");
34 string execdir = string(m_conf->getconf(
"system",
"execdir_base")) +
"/distributor";
36 mkdir(execdir.c_str(), 0755);
37 chdir(execdir.c_str());
40 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
41 string(m_conf->getconf(
"distributor",
"nodename"));
49 string ringbuf = string(m_conf->getconf(
"system",
"unitname")) +
":" +
50 string(m_conf->getconf(
"distributor",
"ringbuffer"));
51 int rbufsize = m_conf->getconfi(
"distributor",
"ringbuffersize");
52 m_rbufin =
new RingBuffer(ringbuf.c_str(), rbufsize);
55 m_log =
new RFLogManager(nodename, m_conf->getconf(
"system",
"lognode"));
58 m_flow =
new RFFlowStat((
char*)shmname.c_str());
62ERecoDistributor::~ERecoDistributor()
79 string ringbuf = string(m_conf->getconf(
"system",
"unitname")) +
":" +
80 string(m_conf->getconf(
"distributor",
"ringbuffer"));
83 string shmname = string(m_conf->getconf(
"system",
"unitname")) +
":" +
84 string(m_conf->getconf(
"distributor",
"nodename"));
88 int maxnodes = m_conf->getconfi(
"processor",
"nnodes");
89 int idbase = m_conf->getconfi(
"processor",
"idbase");
91 char* badlist = m_conf->getconf(
"processor",
"badlist");
93 char* sender = m_conf->getconf(
"distributor",
"sender",
"script");
94 int portbase = m_conf->getconfi(
"distributor",
"sender",
"portbase");
97 char hostname[512], idname[3], shmid[3];
98 for (
int i = 0; i < maxnodes; i++) {
99 sprintf(idname,
"%2.2d", idbase + i);
100 sprintf(shmid,
"%2.2d", i);
101 if (badlist == NULL ||
102 strstr(badlist, idname) == 0) {
103 int port = (idbase + i) + portbase;
105 sprintf(portchar,
"%d", port);
106 m_pid_sender[m_nnodes] = m_proc->
Execute(sender, (
char*)ringbuf.c_str(), portchar, (
char*)shmname.c_str(), shmid);
107 printf(
"Running sender to %d\n", port);
115 int maxrecv = m_conf->getconfi(
"distributor",
"receiver",
"nnodes");
116 int ridbase = m_conf->getconfi(
"distributor",
"receiver",
"idbase");
117 char* rhostbase = m_conf->getconf(
"distributor",
"receiver",
"hostbase");
118 char* rbadlist = m_conf->getconf(
"distributor",
"receiver",
"badlist");
119 char* port = m_conf->getconf(
"distributor",
"receiver",
"port");
120 char* receiver = m_conf->getconf(
"distributor",
"receiver",
"script");
123 for (
int i = 0; i < maxrecv; i++) {
124 sprintf(hostname,
"%s%2.2d", rhostbase, ridbase + i);
125 sprintf(idname,
"%2.2d", ridbase + i);
126 sprintf(shmid,
"%2.2d", i + 20);
127 if (rbadlist == NULL ||
128 strstr(rbadlist, hostname) == 0) {
129 printf(
"Running receiver for %s\n", hostname);
131 m_pid_recv[m_nrecv] = m_proc->
Execute(receiver, (
char*)ringbuf.c_str(), hostname, port, (
char*)shmname.c_str(), shmid);
136 printf(
"ERecoDistributor : Configure done\n");
145 printf(
"m_nrecv = %d\n", m_nrecv);
146 for (
int i = 0; i < m_nrecv; i++) {
147 if (m_pid_recv[i] != 0) {
148 printf(
"ERecoDistributor : killing receiver pid=%d\n", m_pid_recv[i]);
149 kill(m_pid_recv[i], SIGKILL);
150 waitpid(m_pid_recv[i], &status, 0);
154 printf(
"m_nnodes = %d\n", m_nnodes);
155 for (
int i = 0; i < m_nnodes; i++) {
156 if (m_pid_sender[i] != 0) {
157 printf(
"ERecoDistributor : killing sender pid=%d\n", m_pid_sender[i]);
159 kill(m_pid_sender[i], SIGKILL);
160 waitpid(m_pid_sender[i], &status, 0);
167 m_flow->fillProcessStatus(GetNodeInfo());
169 printf(
"ERecoDistributor : Unconfigure done\n");
188 printf(
"ERecoDistributor : Restarting!!!!!\n");
191 ERecoDistributor::UnConfigure(nsmmsg, nsmcontext);
193 ERecoDistributor::Configure(nsmmsg, nsmcontext);
199void ERecoDistributor::server()
202 m_flow->fillProcessStatus(GetNodeInfo());
205 pid_t pid = m_proc->CheckProcess();
207 printf(
"ERecoDistributor : process dead. pid = %d\n", pid);
208 for (
int i = 0; i < m_nrecv; i++) {
209 if (pid == m_pid_recv[i]) {
210 m_log->Fatal(
"ERecoDistributor : receiver process (%d) dead. pid=%d\n", i, pid);
214 for (
int i = 0; i < m_nnodes; i++) {
215 if (pid == m_pid_sender[i]) {
216 m_log->Fatal(
"ERecoDistributor : sender process (%d) dead. pid=%d\n", i, m_pid_sender[i]);
222 int st = m_proc->CheckOutput();
224 perror(
"ERecoDistributor::server");
227 m_log->ProcessLog(m_proc->GetFd());
int Execute(char *script, int nargs, char **args)
Class to manage a Ring Buffer placed in an IPC shared memory.
int clear()
Clear the RingBuffer.
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Abstract base class for different kinds of events.