Belle II Software development
ERecoEventProcessor.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include "daq/expreco/ERecoEventProcessor.h"
10#include <iostream>
11
12#include <sys/stat.h>
13#include <sys/types.h>
14#include <sys/wait.h>
15#include <unistd.h>
16
17#include <csignal>
18#include <cstring>
19
20#define RFOTSOUT stdout
21
22using namespace std;
23using namespace Belle2;
24
25ERecoEventProcessor::ERecoEventProcessor(string conffile)
26{
27 // 0. Initialize configuration manager
28 m_conf = new RFConf(conffile.c_str());
29 // char* nodename = m_conf->getconf ( "processor", "nodename" );
30 // char nodename[256];
31 strcpy(m_nodename, "evp_");
32
33 gethostname(&m_nodename[4], sizeof(m_nodename) - 4);
34 printf("nodename = %s\n", m_nodename);
35
36 // 1. Set execution directory
37 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/" + string(m_nodename);
38 printf("execdir = %s\n", execdir.c_str());
39
40 mkdir(execdir.c_str(), 0755);
41 chdir(execdir.c_str());
42
43 // 2. Initialize local shared memory
44 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
45 string(m_nodename);
46 m_shm = new RFSharedMem((char*)shmname.c_str());
47
48 // 3. Initialize process manager
49 m_proc = new RFProcessManager(m_nodename);
50
51 // 4. Initialize RingBuffers
52 // char* rbufin = m_conf->getconf("processor", "ringbufin");
53 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
54 string(m_conf->getconf("processor", "ringbufin"));
55 int rbinsize = m_conf->getconfi("processor", "ringbufinsize");
56 // m_rbufin = new RingBuffer(rbufin, rbinsize);
57 m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
58 // char* rbufout = m_conf->getconf("processor", "ringbufout");
59 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
60 string(m_conf->getconf("processor", "ringbufout"));
61 int rboutsize = m_conf->getconfi("processor", "ringbufoutsize");
62 // m_rbufout = new RingBuffer(rbufout, rboutsize);
63 m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
64
65 // 5. Initialize LogManager
66 m_log = new RFLogManager(m_nodename, m_conf->getconf("system", "lognode"));
67
68 // 6. Initialize data flow monitor
69 m_flow = new RFFlowStat((char*)shmname.c_str());
70
71}
72
73ERecoEventProcessor::~ERecoEventProcessor()
74{
75 delete m_log;
76 delete m_proc;
77 delete m_shm;
78 delete m_conf;
79 delete m_flow;
80 delete m_rbufin;
81 delete m_rbufout;
82}
83
84
85// Functions hooked up by NSM2
86
87int ERecoEventProcessor::Configure(NSMmsg* /*nsmm*/, NSMcontext* /*nsmc*/)
88{
89 // Start processes from down stream
90
91 // 0. Get common parameters
92 // char* rbufin = m_conf->getconf("processor", "ringbufin");
93 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
94 string(m_conf->getconf("processor", "ringbufin"));
95 // char* rbufout = m_conf->getconf("processor", "ringbufout");
96 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
97 string(m_conf->getconf("processor", "ringbufout"));
98
99 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
100 string(m_nodename);
101
102 // 1. Histogram Receiver
103 char* hrecv = m_conf->getconf("processor", "historecv", "script");
104 char* hport = m_conf->getconf("processor", "historecv", "port");
105 char* mapfile = m_conf->getconf("processor", "historecv", "mapfile");
106 m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
107
108 sleep(5); // make sure that TMapFile is created.
109
110 // 2. Histogram Relay
111 char* hrelay = m_conf->getconf("processor", "historelay", "script");
112 char* dqmdest = m_conf->getconf("dqmserver", "host");
113 char* dqmport = m_conf->getconf("dqmserver", "port");
114 char* interval = m_conf->getconf("processor", "historelay", "interval");
115 m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
116
117 // 3. Run basf2
118 /*
119 char* basf2 = m_conf->getconf("processor", "basf2", "script");
120 if (nsmm->len > 0) {
121 basf2 = (char*) nsmm->datap;
122 printf("Configure: basf2 script overridden : %s\n", basf2);
123 }
124 m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
125 */
126
127 // 4. Run receiver
128 char* receiver = m_conf->getconf("processor", "receiver", "script");
129 char* srchost = m_conf->getconf("distributor", "host");
130 // char* port = m_conf->getconf ( "distributor", "port" );
131 int portbase = m_conf->getconfi("distributor", "sender", "portbase");
132 // char* hostbase = m_conf->getconf("processor", "nodebase");
133 int rport;
134 sscanf(&m_nodename[strlen(m_nodename) - 2], "%d", &rport);
135 rport += portbase;
136 char portchar[256];
137 sprintf(portchar, "%d", rport);
138 m_pid_receiver = m_proc->Execute(receiver, (char*)rbufin.c_str(), srchost, portchar, (char*)shmname.c_str(), (char*)"0");
139
140 // 5. Run EventServer
141 char* evs = m_conf->getconf("processor", "eventserver", "script");
142 char* evsport = m_conf->getconf("processor", "eventserver", "port");
143 m_pid_evs = m_proc->Execute(evs, (char*)rbufout.c_str(), evsport);
144
145 printf("Configure : done\n");
146 fflush(stdout);
147 return 0;
148
149}
150
151int ERecoEventProcessor::UnConfigure(NSMmsg*, NSMcontext*)
152{
153 // Simple implementation to stop all processes
154 // system("killall basf2 sock2rbr rb2sockr hrelay hserver");
155 int status;
156 if (m_pid_sender != 0) {
157 printf("ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
158 kill(m_pid_sender, SIGINT);
159 waitpid(m_pid_sender, &status, 0);
160 }
161 if (m_pid_basf2 != 0) {
162 printf("ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
163 kill(m_pid_basf2, SIGINT);
164 waitpid(m_pid_basf2, &status, 0);
165 }
166 if (m_pid_receiver != 0) {
167 printf("ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
168 kill(m_pid_receiver, SIGINT);
169 waitpid(m_pid_receiver, &status, 0);
170 }
171 if (m_pid_hrecv != 0) {
172 printf("ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
173 kill(m_pid_hrecv, SIGINT);
174 waitpid(m_pid_hrecv, &status, 0);
175 }
176 if (m_pid_hrelay != 0) {
177 printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
178 kill(m_pid_hrelay, SIGINT);
179 waitpid(m_pid_hrelay, &status, 0);
180 }
181 if (m_pid_evs != 0) {
182 printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
183 kill(m_pid_evs, SIGINT);
184 waitpid(m_pid_evs, &status, 0);
185 }
186
187 // Clear RingBuffers
188 m_rbufin->forceClear();
189 m_rbufout->forceClear();
190
191 // Clear PID list
192 m_flow->fillProcessStatus(GetNodeInfo());
193
194 printf("Unconfigure : done\n");
195 fflush(stdout);
196 return 0;
197}
198
199int ERecoEventProcessor::Start(NSMmsg* nsmm, NSMcontext* /*nsmc*/)
200{
201 // Clear DQM histogram memory
202 char cmdline[] = "hsendcommand DQMRC:CLEAR localhost 10391";
203 system(cmdline);
204 printf("ERecoEventProcessor : DQM TMemFile cleared\n");
205
206 m_rbufin->clear();
207
208 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
209 string(m_conf->getconf("processor", "ringbufin"));
210 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
211 string(m_conf->getconf("processor", "ringbufout"));
212 char* hport = m_conf->getconf("processor", "historecv", "port");
213
214
215 // 3. Run basf2
216 char* basf2 = m_conf->getconf("processor", "basf2", "script");
217 if (nsmm->len > 0) {
218 basf2 = (char*) nsmm->datap;
219 printf("Configure: basf2 script overridden : %s\n", basf2);
220 }
221 m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
222
223 return 0;
224}
225
226int ERecoEventProcessor::Stop(NSMmsg*, NSMcontext*)
227{
228 /*
229 char* hcollect = m_conf->getconf("processor", "dqm", "hcollect");
230 char* filename = m_conf->getconf("processor", "dqm", "file");
231 char* nprocs = m_conf->getconf("processor", "basf2", "nprocs");
232 int pid_hcollect = m_proc->Execute(hcollect, filename, nprocs);
233 int status;
234 waitpid(pid_hcollect, &status, 0);
235 */
236 int status;
237 if (m_pid_basf2 != 0) {
238 printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
239 kill(m_pid_basf2, SIGINT);
240 waitpid(m_pid_basf2, &status, 0);
241 m_pid_basf2 = 0;
242 }
243 return 0;
244}
245
246
247int ERecoEventProcessor::Restart(NSMmsg*, NSMcontext*)
248{
249 printf("ERecoEventProcessor : Restarting!!!!!\n");
250 /* Original impl.
251 if (m_pid_sender != 0) {
252 printf("ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
253 kill(m_pid_sender, SIGINT);
254 }
255 if (m_pid_basf2 != 0) {
256 printf("ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
257 kill(m_pid_basf2, SIGINT);
258 }
259 if (m_pid_receiver != 0) {
260 printf("ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
261 kill(m_pid_receiver, SIGINT);
262 }
263 if (m_pid_hrecv != 0) {
264 printf("ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
265 kill(m_pid_receiver, SIGINT);
266 }
267 if (m_pid_hrelay != 0) {
268 printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
269 kill(m_pid_receiver, SIGINT);
270 }
271 // Simple implementation to stop all processes
272 system("killall basf2 sock2rbr rb2sockr hrelay hserver");
273 fflush(stdout);
274 */
275 NSMmsg* nsmmsg = NULL;
276 NSMcontext* nsmcontext = NULL;
277 ERecoEventProcessor::UnConfigure(nsmmsg, nsmcontext);
278 sleep(2);
279 ERecoEventProcessor::Configure(nsmmsg, nsmcontext);
280 return 0;
281}
282
283// Server function
284
285void ERecoEventProcessor::server()
286{
287 // Clear PID list
288 m_flow->fillProcessStatus(GetNodeInfo());
289 // Start Loop
290 while (true) {
291 pid_t pid = m_proc->CheckProcess();
292 if (pid > 0) {
293 printf("ERecoEventProcessor : process dead pid=%d\n", pid);
294 if (pid == m_pid_sender) {
295 m_log->Fatal("ERecoEventProcessor : sender dead. pid=%d\n", m_pid_sender);
296 m_pid_sender = 0;
297 } else if (pid == m_pid_basf2) {
298 m_log->Fatal("ERecoEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
299 m_pid_basf2 = 0;
300 } else if (pid == m_pid_receiver) {
301 m_log->Fatal("ERecoEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
302 m_pid_receiver = 0;
303 } else if (pid == m_pid_hrecv) {
304 m_log->Fatal("ERecoEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
305 m_pid_hrecv = 0;
306 } else if (pid == m_pid_hrelay) {
307 m_log->Fatal("ERecoEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
308 m_pid_hrelay = 0;
309 }
310 }
311 int st = m_proc->CheckOutput();
312 if (st < 0) {
313 perror("ERecoEventProcessor::server");
314 // exit ( -1 );
315 } else if (st > 0) {
316 m_log->ProcessLog(m_proc->GetFd());
317 }
318 m_flow->fillNodeInfo(0, GetNodeInfo(), false);
319 m_flow->fillNodeInfo(1, GetNodeInfo(), true);
320 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
321 m_pid_hrecv, m_pid_hrelay);
322 }
323}
324
325
326
int Execute(char *script, int nargs, char **args)
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int clear()
Clear the RingBuffer.
Definition: RingBuffer.cc:426
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Definition: RingBuffer.cc:441
Abstract base class for different kinds of events.
STL namespace.
Definition: nsm2.h:224