Belle II Software development
RFOutputServer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include "daq/rfarm/manager/RFOutputServer.h"
10
11#include <sys/stat.h>
12#include <sys/types.h>
13#include <sys/wait.h>
14
15#include <unistd.h>
16
17#include <csignal>
18#include <cstring>
19
20#define RFOTSOUT stdout
21
22using namespace std;
23using namespace Belle2;
24
25RFOutputServer::RFOutputServer(string conffile)
26{
27 // 0. Initialize configuration manager
28 m_conf = new RFConf(conffile.c_str());
29 char* nodename = m_conf->getconf("collector", "nodename");
30 // char nodename[256];
31 // gethostname(nodename, sizeof(nodename));
32
33 // 1. Set execution directory
34 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/collector";
35
36 mkdir(execdir.c_str(), 0755);
37 chdir(execdir.c_str());
38
39 // 2. Initialize local shared memory
40 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
41 string(nodename);
42 m_shm = new RFSharedMem((char*)shmname.c_str());
43
44 // 3. Initialize RingBuffers
45 // char* rbufin = m_conf->getconf("collector", "ringbufin");
46 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
47 string(m_conf->getconf("collector", "ringbufin"));
48 int rbinsize = m_conf->getconfi("collector", "ringbufinsize");
49 m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
50 // char* rbufout = m_conf->getconf("collector", "ringbufout");
51 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
52 string(m_conf->getconf("collector", "ringbufout"));
53 int rboutsize = m_conf->getconfi("collector", "ringbufoutsize");
54 m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
55
56 // 4. Initialize process manager
57 m_proc = new RFProcessManager(nodename);
58
59 // 5. Initialize LogManager
60 m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
61
62 // 6. Initialize data flow monitor
63 m_flow = new RFFlowStat((char*)shmname.c_str());
64
65 // 7. Clear PID list
66 m_pid_sender = 0;
67 m_pid_basf2 = 0;
68 m_nnodes = m_conf->getconfi("processor", "nnodes");
69 for (int i = 0; i < m_nnodes; i++)
70 m_pid_receiver[i] = 0;
71
72}
73
74RFOutputServer::~RFOutputServer()
75{
76 delete m_log;
77 delete m_proc;
78 delete m_shm;
79 delete m_conf;
80 delete m_rbufin;
81 delete m_rbufout;
82 delete m_flow;
83}
84
85
86// Functions hooked up by NSM2
87
88int RFOutputServer::Configure(NSMmsg* nsmm, NSMcontext* /*nsmc*/)
89{
90 // Start processes from down stream
91
92 // 0. Get common parameters
93 // char* rbufin = m_conf->getconf("collector", "ringbufin");
94 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
95 string(m_conf->getconf("collector", "ringbufin"));
96 // char* rbufout = m_conf->getconf("collector", "ringbufout");
97 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
98 string(m_conf->getconf("collector", "ringbufout"));
99
100 // char* shmname = m_conf->getconf("collector", "nodename");
101 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
102 string(m_conf->getconf("collector", "nodename"));
103
104 // 1. Histogram Receiver
105 // char* hrecv = m_conf->getconf("collector", "historecv", "script");
106 char* hport = m_conf->getconf("collector", "historecv", "port");
107 // char* mapfile = m_conf->getconf("collector", "historecv", "mapfile");
108 // m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
109
110 // 2. Histogram Relay
111 // char* hrelay = m_conf->getconf("collector", "historelay", "script");
112 // char* dqmdest = m_conf->getconf("dqmserver", "host");
113 // char* dqmport = m_conf->getconf("dqmserver", "port");
114 // char* interval = m_conf->getconf("collector", "historelay", "interval");
115 // m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
116
117 // 3. Run sender / logger
118 char* src = m_conf->getconf("collector", "destination");
119 if (strstr(src, "net") != 0) {
120 // Run sender
121 char* sender = m_conf->getconf("collector", "sender", "script");
122 char* port = m_conf->getconf("collector", "sender", "port");
123 char idbuf[3];
124 sprintf(idbuf, "%2.2d", RF_OUTPUT_ID);
125 m_pid_sender = m_proc->Execute(sender, (char*)rbufout.c_str(), port, (char*)shmname.c_str(), idbuf);
126 m_flow->clear(RF_OUTPUT_ID);
127 } else if (strstr(src, "file") != 0) {
128 // Run file writer
129 char* writer = m_conf->getconf("collector", "writer", "script");
130 char* file = m_conf->getconf("collector", "writer", "filename");
131 char* nnode = m_conf->getconf("processor", "nnodes");
132 m_pid_sender = m_proc->Execute(writer, (char*)rbufout.c_str(), file, nnode);
133 } else {
134 // Do not run anything
135 }
136
137 // 4. Run basf2
138 char* basf2 = m_conf->getconf("collector", "basf2", "script");
139 if (nsmm->len > 0) {
140 basf2 = (char*) nsmm->datap;
141 printf("Configure: basf2 script overridden : %s\n", basf2);
142 }
143 m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
144
145 // 5. Run receiver
146 m_nnodes = 0;
147 int maxnodes = m_conf->getconfi("processor", "nnodes");
148 int idbase = m_conf->getconfi("processor", "idbase");
149 // char* hostbase = m_conf->getconf("processor", "hostbase");
150 char* hostbase = m_conf->getconf("processor", "hostbase");
151 char* badlist = m_conf->getconf("processor", "badlist");
152 char* port = m_conf->getconf("processor", "sender", "port");
153
154 char* receiver = m_conf->getconf("collector", "receiver", "script");
155 char hostname[512], idname[3], shmid[3];
156 for (int i = 0; i < maxnodes; i++) {
157 sprintf(idname, "%2.2d", idbase + i);
158 sprintf(shmid, "%2.2d", i);
159 if (badlist == NULL ||
160 strstr(badlist, idname) == 0) {
161 sprintf(hostname, "%s%2.2d", hostbase, idbase + i);
162 m_pid_receiver[m_nnodes] = m_proc->Execute(receiver, (char*)rbufin.c_str(), hostname, port, (char*)shmname.c_str(), shmid);
163 m_flow->clear(i);
164 m_nnodes++;
165 }
166 }
167
168 m_rbufin->forceClear();
169 m_rbufout->forceClear();
170 return 0;
171}
172
173int RFOutputServer::UnConfigure(NSMmsg*, NSMcontext*)
174{
175 // system("killall sock2rbr rb2sockr basf2 hrelay hserver");
176
177 printf("m_pid_sender = %d\n", m_pid_sender);
178 printf("m_pid_basf2 = %d\n", m_pid_basf2);
179 fflush(stdout);
180 int status, ws;
181 if (m_pid_sender != 0) {
182 printf("killing sender %d\n", m_pid_sender);
183 // kill(m_pid_sender, SIGINT);
184 kill(m_pid_sender, SIGINT);
185 ws = waitpid(m_pid_sender, &status, 0);
186 printf("wait return = %d, status = %d\n", ws, status);
187 }
188
189 if (m_pid_basf2 != 0) {
190 printf("killing sender %d\n", m_pid_sender);
191 kill(m_pid_basf2, SIGINT);
192 ws = waitpid(m_pid_basf2, &status, 0);
193 printf("wait return = %d, status = %d\n", ws, status);
194 }
195
196 for (int i = 0; i < m_nnodes; i++) {
197 if (m_pid_receiver[i] != 0) {
198 printf("killing receiver %d\n", m_pid_receiver[i]);
199 kill(m_pid_receiver[i], SIGINT);
200 ws = waitpid(m_pid_receiver[i], &status, 0);
201 printf("wait return = %d, status = %d\n", ws, status);
202 }
203 }
204
205 // Clear RingBuffer
206 m_rbufin->forceClear();
207 m_rbufout->forceClear();
208
209 // Clear process list
210 m_flow->fillProcessStatus(GetNodeInfo());
211
212 printf("Unconfigure done\n");
213 fflush(stdout);
214 return 0;
215}
216
217int RFOutputServer::Start(NSMmsg*, NSMcontext*)
218{
219 // Clear RingBuffer
220 m_rbufout->forceClear();
221 // m_rbufin->forceClear();
222 return 0;
223}
224
225int RFOutputServer::Stop(NSMmsg*, NSMcontext*)
226{
227 return 0;
228}
229
230
231int RFOutputServer::Restart(NSMmsg*, NSMcontext*)
232{
233 printf("RFOutputServer : Restarting!!!!!!\n");
234 /* Original Impl
235 kill(m_pid_sender, SIGINT);
236 kill(m_pid_basf2, SIGINT);
237 for (int i = 0; i < m_nnodes; i++) {
238 kill(m_pid_receiver[i], SIGINT);
239 }
240 // Simple Implementation
241 system("killall sock2rbr rb2sockr basf2 hrelay hserver");
242 fflush(stdout);
243 */
244 NSMmsg* nsmmsg = NULL;
245 NSMcontext* nsmcontext = NULL;
246 RFOutputServer::UnConfigure(nsmmsg, nsmcontext);
247 sleep(2);
248 RFOutputServer::Configure(nsmmsg, nsmcontext);
249 return 0;
250}
251
252// Server function
253
254void RFOutputServer::server()
255{
256 m_flow->fillProcessStatus(GetNodeInfo());
257 while (true) {
258 int recv_id = 0;
259 pid_t pid = m_proc->CheckProcess();
260 if (pid > 0) {
261 printf("RFOutputServer : process dead. pid=%d\n", pid);
262 if (pid == m_pid_sender) {
263 m_log->Fatal("RFOutputServer : sender process dead. pid=%d\n", pid);
264 m_pid_sender = 0;
265 } else if (pid == m_pid_basf2) {
266 m_log->Fatal("RFOutputServer : basf2 process dead. pid=%d\n", pid);
267 m_pid_basf2 = 0;
268 } else {
269 for (int i = 0; i < m_nnodes; i++) {
270 if (pid == m_pid_receiver[i]) {
271 m_log->Fatal("RFOutputServer : receiver process %d dead. pid=%d\n", i, pid);
272 m_pid_receiver[i] = 0;
273 recv_id = i;
274 }
275 }
276 }
277 }
278
279 int st = m_proc->CheckOutput();
280 if (st < 0) {
281 perror("RFOutputServer::server");
282 // exit ( -1 );
283 } else if (st > 0) {
284 m_log->ProcessLog(m_proc->GetFd());
285 }
286 m_flow->fillNodeInfo(RF_OUTPUT_ID, GetNodeInfo(), true);
287 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver[recv_id], m_pid_sender,
288 m_pid_basf2);
289 }
290}
291void RFOutputServer::cleanup()
292{
293 printf("RFOutputServer : cleaning up\n");
294 UnConfigure(NULL, NULL);
295 printf("RFOutputServer: Done. Exitting\n");
296 exit(-1);
297}
298
299
300
int Execute(char *script, int nargs, char **args)
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Definition: RingBuffer.cc:441
Abstract base class for different kinds of events.
STL namespace.
Definition: nsm2.h:224