98{
99
100
101 string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
102 string(m_conf->getconf("distributor", "ringbuffer"));
103
104
105 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
106 string(m_conf->getconf("distributor", "nodename"));
107
108
109 m_nnodes = 0;
110 int maxnodes = m_conf->getconfi("processor", "nnodes");
111 int idbase = m_conf->getconfi("processor", "idbase");
112
113 char* badlist = m_conf->getconf("processor", "badlist");
114
115 char* sender = m_conf->getconf("distributor", "sender", "script");
116 int portbase = m_conf->getconfi("distributor", "sender", "portbase");
117
118 char idname[3], shmid[3];
119 for (int i = 0; i < maxnodes; i++) {
120 sprintf(idname, "%2.2d", idbase + i);
121 sprintf(shmid, "%2.2d", i);
122 if (badlist == NULL ||
123 strstr(badlist, idname) == 0) {
124 int port = (idbase + i) + portbase;
125 char portchar[256];
126 sprintf(portchar, "%d", port);
127 m_pid_sender[m_nnodes] = m_proc->
Execute(sender, (
char*)ringbuf.c_str(), portchar, (
char*)shmname.c_str(), shmid);
128 m_flow->clear(i);
129 m_nnodes++;
130 }
131 }
132
133
134 char* srcG = m_conf->getconf("distributor", "source");
135 if (strstr(srcG, "net") != 0) {
136
137 char* receiver = m_conf->getconf("distributor", "receiver", "script");
138 char* src = m_conf->getconf("distributor", "receiver", "host");
139 char* port = m_conf->getconf("distributor", "receiver", "port");
140
141 char idbuf[3];
142 sprintf(idbuf, "%2.2d", RF_INPUT_ID);
143 m_pid_recv = m_proc->
Execute(receiver, (
char*)ringbuf.c_str(), src, port, (
char*)shmname.c_str(), idbuf);
144 m_flow->clear(RF_INPUT_ID);
145 } else if (strstr(srcG, "file") != 0) {
146
147 char* filein = m_conf->getconf("distributor", "fileinput", "script");
148 char* file = m_conf->getconf("distributor", "fileinput", "filename");
149
150 char* nnodechr = m_conf->getconf("distributor", "nnodes");
151 m_pid_recv = m_proc->
Execute(filein, (
char*)ringbuf.c_str(), file, nnodechr);
152 }
153
155
156
157 return 0;
158}
int Execute(char *script, int nargs, char **args)
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.