Belle II Software development
RFEventProcessor Class Reference
Inheritance diagram for RFEventProcessor:
RFServerBase

Public Member Functions

 RFEventProcessor (std::string conffile)
 
int Configure (NSMmsg *, NSMcontext *) override
 
int UnConfigure (NSMmsg *, NSMcontext *) override
 
int Start (NSMmsg *, NSMcontext *) override
 
int Stop (NSMmsg *, NSMcontext *) override
 
int Restart (NSMmsg *, NSMcontext *) override
 
void server ()
 
void cleanup ()
 
virtual int Pause (NSMmsg *, NSMcontext *)
 
virtual int Resume (NSMmsg *, NSMcontext *)
 
virtual int Status (NSMmsg *, NSMcontext *)
 
virtual void SetNodeInfo (RfNodeInfo *ptr)
 
virtual RfNodeInfoGetNodeInfo ()
 

Static Public Attributes

static RFServerBases_instance
 

Private Attributes

RFConfm_conf
 
RFSharedMemm_shm
 
RFProcessManagerm_proc
 
RFLogManagerm_log
 
RingBufferm_rbufin
 
RingBufferm_rbufout
 
RFFlowStatm_flow
 
int m_pid_receiver
 
int m_pid_basf2
 
int m_pid_sender
 
int m_pid_hrecv
 
int m_pid_hrelay
 
char m_nodename [256+4]
 
int m_expno
 
int m_runno
 
RfNodeInfom_nsmmem
 

Detailed Description

Definition at line 31 of file RFEventProcessor.h.

Constructor & Destructor Documentation

◆ RFEventProcessor()

RFEventProcessor ( std::string  conffile)

Definition at line 26 of file RFEventProcessor.cc.

27{
28 // 0. Initialize configuration manager
29 m_conf = new RFConf(conffile.c_str());
30 // char* nodename = m_conf->getconf ( "processor", "nodename" );
31 // char nodename[256];
32 strcpy(m_nodename, "evp_");
33#ifndef DESY
34 gethostname(&m_nodename[4], sizeof(m_nodename) - 4);
35#else
36 // Special treatment for DESY test nodes!!
37 char hostnamebuf[256];
38 gethostname(hostnamebuf, sizeof(hostnamebuf));
39 strcat(&m_nodename[4], &hostnamebuf[6]);
40 int lend = strlen(m_nodename);
41 m_nodename[lend + 1] = (char)0;
42 m_nodename[lend] = m_nodename[lend - 1];
43 strncpy(&m_nodename[lend - 1], "0", 1);
44 // End of DESY special treatment
45#endif
46 printf("nodename = %s\n", m_nodename);
47
48 // 1. Set execution directory
49 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/" + string(m_nodename);
50 printf("execdir = %s\n", execdir.c_str());
51
52 mkdir(execdir.c_str(), 0755);
53 chdir(execdir.c_str());
54
55 // 2. Initialize local shared memory
56 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
57 string(m_nodename);
58 m_shm = new RFSharedMem((char*)shmname.c_str());
59
60 // 3. Initialize process manager
61 m_proc = new RFProcessManager(m_nodename);
62
63 // 4. Initialize RingBuffers
64 // char* rbufin = m_conf->getconf("processor", "ringbufin");
65 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
66 string(m_conf->getconf("processor", "ringbufin"));
67 int rbinsize = m_conf->getconfi("processor", "ringbufinsize");
68 // m_rbufin = new RingBuffer(rbufin, rbinsize);
69 m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
70 // char* rbufout = m_conf->getconf("processor", "ringbufout");
71 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
72 string(m_conf->getconf("processor", "ringbufout"));
73 int rboutsize = m_conf->getconfi("processor", "ringbufoutsize");
74 // m_rbufout = new RingBuffer(rbufout, rboutsize);
75 m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
76
77 // 5. Initialize LogManager
78 m_log = new RFLogManager(m_nodename, m_conf->getconf("system", "lognode"));
79
80 // 6. Initialize data flow monitor
81 m_flow = new RFFlowStat((char*)shmname.c_str());
82
83 // 7. Clear PID list
84 m_pid_sender = 0;
85 m_pid_basf2 = 0;
86 m_pid_receiver = 0;
87 m_pid_hrecv = 0;
88 m_pid_hrelay = 0;
89
90}
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39

◆ ~RFEventProcessor()

Definition at line 92 of file RFEventProcessor.cc.

93{
94 delete m_log;
95 delete m_proc;
96 delete m_shm;
97 delete m_conf;
98 delete m_flow;
99 delete m_rbufin;
100 delete m_rbufout;
101}

Member Function Documentation

◆ cleanup()

void cleanup ( )

Definition at line 392 of file RFEventProcessor.cc.

393{
394 printf("RFEventProcessor : cleaning up\n");
395 UnConfigure(NULL, NULL);
396 printf("RFEventProcessor: Done. Exitting\n");
397 exit(-1);
398}

◆ Configure()

int Configure ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 106 of file RFEventProcessor.cc.

107{
108 // Start processes from down stream
109
110 // 0. Get common parameters
111 // char* rbufin = m_conf->getconf("processor", "ringbufin");
112 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
113 string(m_conf->getconf("processor", "ringbufin"));
114 // char* rbufout = m_conf->getconf("processor", "ringbufout");
115 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
116 string(m_conf->getconf("processor", "ringbufout"));
117
118 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
119 string(m_nodename);
120
121 // 1. Histogram Receiver
122 char* hrecv = m_conf->getconf("processor", "historecv", "script");
123 char* hport = m_conf->getconf("processor", "historecv", "port");
124 char* mapfile = m_conf->getconf("processor", "historecv", "mapfile");
125 m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
126
127 sleep(5); // make sure that TMapFile is created.
128
129 // 2. Histogram Relay
130 char* hrelay = m_conf->getconf("processor", "historelay", "script");
131 char* dqmdest = m_conf->getconf("dqmserver", "host");
132 char* dqmport = m_conf->getconf("dqmserver", "port");
133 char* interval = m_conf->getconf("processor", "historelay", "interval");
134 m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
135
136 // 3. Run sender / logger
137 char* sender = m_conf->getconf("processor", "sender", "script");
138 char* port = m_conf->getconf("processor", "sender", "port");
139 m_pid_sender = m_proc->Execute(sender, (char*)rbufout.c_str(), port, (char*)shmname.c_str(), (char*)"1");
140 m_flow->clear(1);
141
142 /*
143 // 4. Run basf2
144 char* basf2 = m_conf->getconf("processor", "basf2", "script");
145 if (nsmm->len > 0) {
146 basf2 = (char*) nsmm->datap;
147 printf("Configure: basf2 script overridden : %s\n", basf2);
148 }
149 m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
150 */
151
152 // 5. Run receiver
153 char* receiver = m_conf->getconf("processor", "receiver", "script");
154 char* srchost = m_conf->getconf("distributor", "host");
155 // char* port = m_conf->getconf ( "distributor", "port" );
156 int portbase = m_conf->getconfi("distributor", "sender", "portbase");
157 // char* hostbase = m_conf->getconf("processor", "nodebase");
158 /* OLD impl
159 int baselen = strlen(hostbase);
160 char hostname[256];
161 gethostname(hostname, sizeof(hostname));
162 char id[3];
163 strcpy(id, &hostname[baselen + 1]);
164 int rport = atoi(id) + portbase;
165 */
166 int rport;
167 sscanf(&m_nodename[strlen(m_nodename) - 2], "%d", &rport);
168 rport += portbase;
169 char portchar[256];
170 sprintf(portchar, "%d", rport);
171 m_pid_receiver = m_proc->Execute(receiver, (char*)rbufin.c_str(), srchost, portchar, (char*)shmname.c_str(), (char*)"0");
172 m_flow->clear(0);
173
174 printf("Configure : done\n");
175 fflush(stdout);
176
177 // 6 Clear RingBuffers
178 m_rbufin->forceClear();
179 m_rbufout->forceClear();
180
181 return 0;
182
183}
int Execute(char *script, int nargs, char **args)
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Definition: RingBuffer.cc:441

◆ GetNodeInfo()

virtual RfNodeInfo * GetNodeInfo ( )
inlinevirtualinherited

Definition at line 38 of file RFServerBase.h.

39 {
40 return m_nsmmem;
41 };

◆ Pause()

virtual int Pause ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 28 of file RFServerBase.h.

28{ return 0; };

◆ Restart()

int Restart ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 314 of file RFEventProcessor.cc.

315{
316 printf("RFEventProcessor : Restarting!!!!!\n");
317 /* Original impl.
318 if (m_pid_sender != 0) {
319 printf("RFEventProcessor : killing sender pid=%d\n", m_pid_sender);
320 kill(m_pid_sender, SIGINT);
321 }
322 if (m_pid_basf2 != 0) {
323 printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
324 kill(m_pid_basf2, SIGINT);
325 }
326 if (m_pid_receiver != 0) {
327 printf("RFEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
328 kill(m_pid_receiver, SIGINT);
329 }
330 if (m_pid_hrecv != 0) {
331 printf("RFEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
332 kill(m_pid_receiver, SIGINT);
333 }
334 if (m_pid_hrelay != 0) {
335 printf("RFEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
336 kill(m_pid_receiver, SIGINT);
337 }
338 // Simple implementation to stop all processes
339 system("killall basf2 sock2rbr rb2sockr hrelay hserver");
340 fflush(stdout);
341 */
342 NSMmsg* nsmmsg = NULL;
343 NSMcontext* nsmcontext = NULL;
344 RFEventProcessor::UnConfigure(nsmmsg, nsmcontext);
345 sleep(2);
346 RFEventProcessor::Configure(nsmmsg, nsmcontext);
347 return 0;
348}
Definition: nsm2.h:224

◆ Resume()

virtual int Resume ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 29 of file RFServerBase.h.

29{ return 0; };

◆ server()

void server ( )

Definition at line 352 of file RFEventProcessor.cc.

353{
354 // Clear PID list
355 m_flow->fillProcessStatus(GetNodeInfo());
356 // Start Loop
357 while (true) {
358 pid_t pid = m_proc->CheckProcess();
359 if (pid > 0) {
360 printf("RFEventProcessor : process dead pid=%d\n", pid);
361 if (pid == m_pid_sender) {
362 m_log->Fatal("RFEventProcessor : sender dead. pid=%d\n", m_pid_sender);
363 m_pid_sender = 0;
364 } else if (pid == m_pid_basf2) {
365 m_log->Fatal("RFEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
366 m_pid_basf2 = 0;
367 } else if (pid == m_pid_receiver) {
368 m_log->Fatal("RFEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
369 m_pid_receiver = 0;
370 } else if (pid == m_pid_hrecv) {
371 m_log->Fatal("RFEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
372 m_pid_hrecv = 0;
373 } else if (pid == m_pid_hrelay) {
374 m_log->Fatal("RFEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
375 m_pid_hrelay = 0;
376 }
377 }
378 int st = m_proc->CheckOutput();
379 if (st < 0) {
380 perror("RFEventProcessor::server");
381 // exit ( -1 );
382 } else if (st > 0) {
383 m_log->ProcessLog(m_proc->GetFd());
384 }
385 m_flow->fillNodeInfo(0, GetNodeInfo(), false);
386 m_flow->fillNodeInfo(1, GetNodeInfo(), true);
387 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
388 m_pid_hrecv, m_pid_hrelay);
389 }
390}

◆ SetNodeInfo()

virtual void SetNodeInfo ( RfNodeInfo ptr)
inlinevirtualinherited

Definition at line 33 of file RFServerBase.h.

34 {
35 m_nsmmem = ptr;
36 };

◆ Start()

int Start ( NSMmsg nsmm,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 235 of file RFEventProcessor.cc.

236{
237 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
238 string(m_conf->getconf("processor", "ringbufin"));
239 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
240 string(m_conf->getconf("processor", "ringbufout"));
241 char* hport = m_conf->getconf("processor", "historecv", "port");
242
243 // 0. Set run numbers
244 m_expno = nsmm->pars[0];
245 m_runno = nsmm->pars[1];
246
247 // 1. Clear RingBuffers
248 m_rbufout->forceClear();
249 m_rbufin->forceClear();
250
251 // 2. Run basf2
252 char* basf2 = m_conf->getconf("processor", "basf2", "script");
253 if (nsmm->len > 0) {
254 basf2 = (char*) nsmm->datap;
255 printf("Configure: basf2 script overridden : %s\n", basf2);
256 }
257 m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
258
259 return 0;
260}

◆ Status()

virtual int Status ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 31 of file RFServerBase.h.

31{ return 0; };

◆ Stop()

int Stop ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 262 of file RFEventProcessor.cc.

263{
264 printf("RFEventProcessor : STOP processing started\n");
265 // fflush ( stdout );
266 /*
267 char* hcollect = m_conf->getconf("processor", "dqm", "hcollect");
268 char* filename = m_conf->getconf("processor", "dqm", "file");
269 char* nprocs = m_conf->getconf("processor", "basf2", "nprocs");
270 int pid_hcollect = m_proc->Execute(hcollect, filename, nprocs);
271 int status;
272 waitpid(pid_hcollect, &status, 0);
273 */
274
275 /*
276 // Need to wait until all the events are processed and sent
277 RfNodeInfo* nodeinfo = GetNodeInfo();
278 int ncount = 0;
279 for (;;) {
280 RfShm_Cell& cellin = m_flow->getinfo(0);
281 RfShm_Cell& cellout = m_flow->getinfo(1);
282 if (cellin.nqueue == 0 && cellout.nqueue == 0) break;
283 printf("inqueue = %d : outqueue = %d\n", cellin.nqueue, cellout.nqueue);
284 if (ncount > 30) {
285 printf("RFEventProcessor : Timeout (30sec) in stopping\n");
286 break;
287 }
288 sleep(1);
289 ncount ++;
290 }
291 */
292 // Checking number of processed events using RfShm_Cell does not work as expected and is
293 // equivalent to wait for 30 sec. Just replaced with 5 sec waiting.
294 sleep(5);
295
296 int status;
297 if (m_pid_basf2 != 0) {
298 printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
299 kill(m_pid_basf2, SIGINT);
300 waitpid(m_pid_basf2, &status, 0);
301 m_pid_basf2 = 0;
302 char outfile[1024];
303 sprintf(outfile, "dqm_e%4.4dr%6.6d.root", m_expno, m_runno);
304 std::rename("histofile.root", outfile);
305 printf("output file name = %s\n", outfile);
306 fflush(stdout);
307
308 }
309 printf("RFEventProcessor : STOP processing done\n");
310 return 0;
311}

◆ UnConfigure()

int UnConfigure ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 185 of file RFEventProcessor.cc.

186{
187 // Simple implementation to stop all processes
188 // system("killall basf2 sock2rbr rb2sockr hrelay hserver");
189
190 // Emergency stop
191 system("killall -9 python");
192
193 // Normal abort
194 int status;
195 if (m_pid_sender != 0) {
196 printf("RFEventProcessor : killing sender pid=%d\n", m_pid_sender);
197 kill(m_pid_sender, SIGINT);
198 waitpid(m_pid_sender, &status, 0);
199 }
200 if (m_pid_basf2 != 0) {
201 printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
202 // kill(m_pid_basf2, SIGINT);
203 kill(m_pid_basf2, SIGINT);
204 waitpid(m_pid_basf2, &status, 0);
205 m_pid_basf2 = 0;
206 }
207 if (m_pid_receiver != 0) {
208 printf("RFEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
209 kill(m_pid_receiver, SIGINT);
210 waitpid(m_pid_receiver, &status, 0);
211 }
212 if (m_pid_hrecv != 0) {
213 printf("RFEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
214 kill(m_pid_hrecv, SIGINT);
215 waitpid(m_pid_hrecv, &status, 0);
216 }
217 if (m_pid_hrelay != 0) {
218 printf("RFEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
219 kill(m_pid_hrelay, SIGINT);
220 waitpid(m_pid_hrelay, &status, 0);
221 }
222
223 // Clear RingBuffers
224 m_rbufin->forceClear();
225 m_rbufout->forceClear();
226
227 // Clear PID list
228 m_flow->fillProcessStatus(GetNodeInfo());
229
230 printf("Unconfigure : done\n");
231 fflush(stdout);
232 return 0;
233}

Member Data Documentation

◆ m_conf

RFConf* m_conf
private

Definition at line 50 of file RFEventProcessor.h.

◆ m_expno

int m_expno
private

Definition at line 65 of file RFEventProcessor.h.

◆ m_flow

RFFlowStat* m_flow
private

Definition at line 56 of file RFEventProcessor.h.

◆ m_log

RFLogManager* m_log
private

Definition at line 53 of file RFEventProcessor.h.

◆ m_nodename

char m_nodename[256+4]
private

Definition at line 63 of file RFEventProcessor.h.

◆ m_nsmmem

RfNodeInfo* m_nsmmem
privateinherited

Definition at line 47 of file RFServerBase.h.

◆ m_pid_basf2

int m_pid_basf2
private

Definition at line 59 of file RFEventProcessor.h.

◆ m_pid_hrecv

int m_pid_hrecv
private

Definition at line 61 of file RFEventProcessor.h.

◆ m_pid_hrelay

int m_pid_hrelay
private

Definition at line 62 of file RFEventProcessor.h.

◆ m_pid_receiver

int m_pid_receiver
private

Definition at line 58 of file RFEventProcessor.h.

◆ m_pid_sender

int m_pid_sender
private

Definition at line 60 of file RFEventProcessor.h.

◆ m_proc

RFProcessManager* m_proc
private

Definition at line 52 of file RFEventProcessor.h.

◆ m_rbufin

RingBuffer* m_rbufin
private

Definition at line 54 of file RFEventProcessor.h.

◆ m_rbufout

RingBuffer* m_rbufout
private

Definition at line 55 of file RFEventProcessor.h.

◆ m_runno

int m_runno
private

Definition at line 66 of file RFEventProcessor.h.

◆ m_shm

RFSharedMem* m_shm
private

Definition at line 51 of file RFEventProcessor.h.

◆ s_instance

RFServerBase* s_instance
staticinherited

Definition at line 44 of file RFServerBase.h.


The documentation for this class was generated from the following files: