Belle II Software development
ERecoEventProcessor Class Reference
Inheritance diagram for ERecoEventProcessor:
RFServerBase

Public Member Functions

 ERecoEventProcessor (std::string conffile)
 
int Configure (NSMmsg *, NSMcontext *) override
 
int UnConfigure (NSMmsg *, NSMcontext *) override
 
int Start (NSMmsg *, NSMcontext *) override
 
int Stop (NSMmsg *, NSMcontext *) override
 
int Restart (NSMmsg *, NSMcontext *) override
 
void server ()
 
virtual int Pause (NSMmsg *, NSMcontext *)
 
virtual int Resume (NSMmsg *, NSMcontext *)
 
virtual int Status (NSMmsg *, NSMcontext *)
 
virtual void SetNodeInfo (RfNodeInfo *ptr)
 
virtual RfNodeInfoGetNodeInfo ()
 

Static Public Attributes

static RFServerBases_instance
 

Private Attributes

RFConfm_conf
 
RFSharedMemm_shm
 
RFProcessManagerm_proc
 
RFLogManagerm_log
 
RingBufferm_rbufin
 
RingBufferm_rbufout
 
RFFlowStatm_flow
 
int m_pid_receiver
 
int m_pid_basf2
 
int m_pid_sender
 
int m_pid_hrecv
 
int m_pid_hrelay
 
int m_pid_evs
 
char m_nodename [256+4]
 
RfNodeInfom_nsmmem
 

Detailed Description

Definition at line 31 of file ERecoEventProcessor.h.

Constructor & Destructor Documentation

◆ ERecoEventProcessor()

ERecoEventProcessor ( std::string  conffile)

Definition at line 25 of file ERecoEventProcessor.cc.

26{
27 // 0. Initialize configuration manager
28 m_conf = new RFConf(conffile.c_str());
29 // char* nodename = m_conf->getconf ( "processor", "nodename" );
30 // char nodename[256];
31 strcpy(m_nodename, "evp_");
32
33 gethostname(&m_nodename[4], sizeof(m_nodename) - 4);
34 printf("nodename = %s\n", m_nodename);
35
36 // 1. Set execution directory
37 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/" + string(m_nodename);
38 printf("execdir = %s\n", execdir.c_str());
39
40 mkdir(execdir.c_str(), 0755);
41 chdir(execdir.c_str());
42
43 // 2. Initialize local shared memory
44 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
45 string(m_nodename);
46 m_shm = new RFSharedMem((char*)shmname.c_str());
47
48 // 3. Initialize process manager
49 m_proc = new RFProcessManager(m_nodename);
50
51 // 4. Initialize RingBuffers
52 // char* rbufin = m_conf->getconf("processor", "ringbufin");
53 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
54 string(m_conf->getconf("processor", "ringbufin"));
55 int rbinsize = m_conf->getconfi("processor", "ringbufinsize");
56 // m_rbufin = new RingBuffer(rbufin, rbinsize);
57 m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
58 // char* rbufout = m_conf->getconf("processor", "ringbufout");
59 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
60 string(m_conf->getconf("processor", "ringbufout"));
61 int rboutsize = m_conf->getconfi("processor", "ringbufoutsize");
62 // m_rbufout = new RingBuffer(rbufout, rboutsize);
63 m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
64
65 // 5. Initialize LogManager
66 m_log = new RFLogManager(m_nodename, m_conf->getconf("system", "lognode"));
67
68 // 6. Initialize data flow monitor
69 m_flow = new RFFlowStat((char*)shmname.c_str());
70
71}
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39

◆ ~ERecoEventProcessor()

Definition at line 73 of file ERecoEventProcessor.cc.

74{
75 delete m_log;
76 delete m_proc;
77 delete m_shm;
78 delete m_conf;
79 delete m_flow;
80 delete m_rbufin;
81 delete m_rbufout;
82}

Member Function Documentation

◆ Configure()

int Configure ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 87 of file ERecoEventProcessor.cc.

88{
89 // Start processes from down stream
90
91 // 0. Get common parameters
92 // char* rbufin = m_conf->getconf("processor", "ringbufin");
93 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
94 string(m_conf->getconf("processor", "ringbufin"));
95 // char* rbufout = m_conf->getconf("processor", "ringbufout");
96 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
97 string(m_conf->getconf("processor", "ringbufout"));
98
99 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
100 string(m_nodename);
101
102 // 1. Histogram Receiver
103 char* hrecv = m_conf->getconf("processor", "historecv", "script");
104 char* hport = m_conf->getconf("processor", "historecv", "port");
105 char* mapfile = m_conf->getconf("processor", "historecv", "mapfile");
106 m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
107
108 sleep(5); // make sure that TMapFile is created.
109
110 // 2. Histogram Relay
111 char* hrelay = m_conf->getconf("processor", "historelay", "script");
112 char* dqmdest = m_conf->getconf("dqmserver", "host");
113 char* dqmport = m_conf->getconf("dqmserver", "port");
114 char* interval = m_conf->getconf("processor", "historelay", "interval");
115 m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
116
117 // 3. Run basf2
118 /*
119 char* basf2 = m_conf->getconf("processor", "basf2", "script");
120 if (nsmm->len > 0) {
121 basf2 = (char*) nsmm->datap;
122 printf("Configure: basf2 script overridden : %s\n", basf2);
123 }
124 m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
125 */
126
127 // 4. Run receiver
128 char* receiver = m_conf->getconf("processor", "receiver", "script");
129 char* srchost = m_conf->getconf("distributor", "host");
130 // char* port = m_conf->getconf ( "distributor", "port" );
131 int portbase = m_conf->getconfi("distributor", "sender", "portbase");
132 // char* hostbase = m_conf->getconf("processor", "nodebase");
133 int rport;
134 sscanf(&m_nodename[strlen(m_nodename) - 2], "%d", &rport);
135 rport += portbase;
136 char portchar[256];
137 sprintf(portchar, "%d", rport);
138 m_pid_receiver = m_proc->Execute(receiver, (char*)rbufin.c_str(), srchost, portchar, (char*)shmname.c_str(), (char*)"0");
139
140 // 5. Run EventServer
141 char* evs = m_conf->getconf("processor", "eventserver", "script");
142 char* evsport = m_conf->getconf("processor", "eventserver", "port");
143 m_pid_evs = m_proc->Execute(evs, (char*)rbufout.c_str(), evsport);
144
145 printf("Configure : done\n");
146 fflush(stdout);
147 return 0;
148
149}
int Execute(char *script, int nargs, char **args)

◆ GetNodeInfo()

virtual RfNodeInfo * GetNodeInfo ( )
inlinevirtualinherited

Definition at line 38 of file RFServerBase.h.

39 {
40 return m_nsmmem;
41 };

◆ Pause()

virtual int Pause ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 28 of file RFServerBase.h.

28{ return 0; };

◆ Restart()

int Restart ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 247 of file ERecoEventProcessor.cc.

248{
249 printf("ERecoEventProcessor : Restarting!!!!!\n");
250 /* Original impl.
251 if (m_pid_sender != 0) {
252 printf("ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
253 kill(m_pid_sender, SIGINT);
254 }
255 if (m_pid_basf2 != 0) {
256 printf("ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
257 kill(m_pid_basf2, SIGINT);
258 }
259 if (m_pid_receiver != 0) {
260 printf("ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
261 kill(m_pid_receiver, SIGINT);
262 }
263 if (m_pid_hrecv != 0) {
264 printf("ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
265 kill(m_pid_receiver, SIGINT);
266 }
267 if (m_pid_hrelay != 0) {
268 printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
269 kill(m_pid_receiver, SIGINT);
270 }
271 // Simple implementation to stop all processes
272 system("killall basf2 sock2rbr rb2sockr hrelay hserver");
273 fflush(stdout);
274 */
275 NSMmsg* nsmmsg = NULL;
276 NSMcontext* nsmcontext = NULL;
277 ERecoEventProcessor::UnConfigure(nsmmsg, nsmcontext);
278 sleep(2);
279 ERecoEventProcessor::Configure(nsmmsg, nsmcontext);
280 return 0;
281}
Definition: nsm2.h:224

◆ Resume()

virtual int Resume ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 29 of file RFServerBase.h.

29{ return 0; };

◆ server()

void server ( )

Definition at line 285 of file ERecoEventProcessor.cc.

286{
287 // Clear PID list
288 m_flow->fillProcessStatus(GetNodeInfo());
289 // Start Loop
290 while (true) {
291 pid_t pid = m_proc->CheckProcess();
292 if (pid > 0) {
293 printf("ERecoEventProcessor : process dead pid=%d\n", pid);
294 if (pid == m_pid_sender) {
295 m_log->Fatal("ERecoEventProcessor : sender dead. pid=%d\n", m_pid_sender);
296 m_pid_sender = 0;
297 } else if (pid == m_pid_basf2) {
298 m_log->Fatal("ERecoEventProcessor : basf2 dead. pid=%d\n", m_pid_basf2);
299 m_pid_basf2 = 0;
300 } else if (pid == m_pid_receiver) {
301 m_log->Fatal("ERecoEventProcessor : receiver dead. pid=%d\n", m_pid_receiver);
302 m_pid_receiver = 0;
303 } else if (pid == m_pid_hrecv) {
304 m_log->Fatal("ERecoEventProcessor : hserver dead. pid=%d\n", m_pid_hrecv);
305 m_pid_hrecv = 0;
306 } else if (pid == m_pid_hrelay) {
307 m_log->Fatal("ERecoEventProcessor : hrelay dead. pid=%d\n", m_pid_hrelay);
308 m_pid_hrelay = 0;
309 }
310 }
311 int st = m_proc->CheckOutput();
312 if (st < 0) {
313 perror("ERecoEventProcessor::server");
314 // exit ( -1 );
315 } else if (st > 0) {
316 m_log->ProcessLog(m_proc->GetFd());
317 }
318 m_flow->fillNodeInfo(0, GetNodeInfo(), false);
319 m_flow->fillNodeInfo(1, GetNodeInfo(), true);
320 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver, m_pid_sender, m_pid_basf2,
321 m_pid_hrecv, m_pid_hrelay);
322 }
323}

◆ SetNodeInfo()

virtual void SetNodeInfo ( RfNodeInfo ptr)
inlinevirtualinherited

Definition at line 33 of file RFServerBase.h.

34 {
35 m_nsmmem = ptr;
36 };

◆ Start()

int Start ( NSMmsg nsmm,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 199 of file ERecoEventProcessor.cc.

200{
201 // Clear DQM histogram memory
202 char cmdline[] = "hsendcommand DQMRC:CLEAR localhost 10391";
203 system(cmdline);
204 printf("ERecoEventProcessor : DQM TMemFile cleared\n");
205
206 m_rbufin->clear();
207
208 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
209 string(m_conf->getconf("processor", "ringbufin"));
210 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
211 string(m_conf->getconf("processor", "ringbufout"));
212 char* hport = m_conf->getconf("processor", "historecv", "port");
213
214
215 // 3. Run basf2
216 char* basf2 = m_conf->getconf("processor", "basf2", "script");
217 if (nsmm->len > 0) {
218 basf2 = (char*) nsmm->datap;
219 printf("Configure: basf2 script overridden : %s\n", basf2);
220 }
221 m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
222
223 return 0;
224}
int clear()
Clear the RingBuffer.
Definition: RingBuffer.cc:426

◆ Status()

virtual int Status ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 31 of file RFServerBase.h.

31{ return 0; };

◆ Stop()

int Stop ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 226 of file ERecoEventProcessor.cc.

227{
228 /*
229 char* hcollect = m_conf->getconf("processor", "dqm", "hcollect");
230 char* filename = m_conf->getconf("processor", "dqm", "file");
231 char* nprocs = m_conf->getconf("processor", "basf2", "nprocs");
232 int pid_hcollect = m_proc->Execute(hcollect, filename, nprocs);
233 int status;
234 waitpid(pid_hcollect, &status, 0);
235 */
236 int status;
237 if (m_pid_basf2 != 0) {
238 printf("RFEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
239 kill(m_pid_basf2, SIGINT);
240 waitpid(m_pid_basf2, &status, 0);
241 m_pid_basf2 = 0;
242 }
243 return 0;
244}

◆ UnConfigure()

int UnConfigure ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 151 of file ERecoEventProcessor.cc.

152{
153 // Simple implementation to stop all processes
154 // system("killall basf2 sock2rbr rb2sockr hrelay hserver");
155 int status;
156 if (m_pid_sender != 0) {
157 printf("ERecoEventProcessor : killing sender pid=%d\n", m_pid_sender);
158 kill(m_pid_sender, SIGINT);
159 waitpid(m_pid_sender, &status, 0);
160 }
161 if (m_pid_basf2 != 0) {
162 printf("ERecoEventProcessor : killing basf2 pid=%d\n", m_pid_basf2);
163 kill(m_pid_basf2, SIGINT);
164 waitpid(m_pid_basf2, &status, 0);
165 }
166 if (m_pid_receiver != 0) {
167 printf("ERecoEventProcessor : killing receiver pid=%d\n", m_pid_receiver);
168 kill(m_pid_receiver, SIGINT);
169 waitpid(m_pid_receiver, &status, 0);
170 }
171 if (m_pid_hrecv != 0) {
172 printf("ERecoEventProcessor : killing hserver pid=%d\n", m_pid_hrecv);
173 kill(m_pid_hrecv, SIGINT);
174 waitpid(m_pid_hrecv, &status, 0);
175 }
176 if (m_pid_hrelay != 0) {
177 printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
178 kill(m_pid_hrelay, SIGINT);
179 waitpid(m_pid_hrelay, &status, 0);
180 }
181 if (m_pid_evs != 0) {
182 printf("ERecoEventProcessor : killing hrelay pid=%d\n", m_pid_hrelay);
183 kill(m_pid_evs, SIGINT);
184 waitpid(m_pid_evs, &status, 0);
185 }
186
187 // Clear RingBuffers
188 m_rbufin->forceClear();
189 m_rbufout->forceClear();
190
191 // Clear PID list
192 m_flow->fillProcessStatus(GetNodeInfo());
193
194 printf("Unconfigure : done\n");
195 fflush(stdout);
196 return 0;
197}
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Definition: RingBuffer.cc:441

Member Data Documentation

◆ m_conf

RFConf* m_conf
private

Definition at line 47 of file ERecoEventProcessor.h.

◆ m_flow

RFFlowStat* m_flow
private

Definition at line 53 of file ERecoEventProcessor.h.

◆ m_log

RFLogManager* m_log
private

Definition at line 50 of file ERecoEventProcessor.h.

◆ m_nodename

char m_nodename[256+4]
private

Definition at line 61 of file ERecoEventProcessor.h.

◆ m_nsmmem

RfNodeInfo* m_nsmmem
privateinherited

Definition at line 47 of file RFServerBase.h.

◆ m_pid_basf2

int m_pid_basf2
private

Definition at line 56 of file ERecoEventProcessor.h.

◆ m_pid_evs

int m_pid_evs
private

Definition at line 60 of file ERecoEventProcessor.h.

◆ m_pid_hrecv

int m_pid_hrecv
private

Definition at line 58 of file ERecoEventProcessor.h.

◆ m_pid_hrelay

int m_pid_hrelay
private

Definition at line 59 of file ERecoEventProcessor.h.

◆ m_pid_receiver

int m_pid_receiver
private

Definition at line 55 of file ERecoEventProcessor.h.

◆ m_pid_sender

int m_pid_sender
private

Definition at line 57 of file ERecoEventProcessor.h.

◆ m_proc

RFProcessManager* m_proc
private

Definition at line 49 of file ERecoEventProcessor.h.

◆ m_rbufin

RingBuffer* m_rbufin
private

Definition at line 51 of file ERecoEventProcessor.h.

◆ m_rbufout

RingBuffer* m_rbufout
private

Definition at line 52 of file ERecoEventProcessor.h.

◆ m_shm

RFSharedMem* m_shm
private

Definition at line 48 of file ERecoEventProcessor.h.

◆ s_instance

RFServerBase* s_instance
staticinherited

Definition at line 44 of file RFServerBase.h.


The documentation for this class was generated from the following files: