Belle II Software development
RFOutputServer Class Reference
Inheritance diagram for RFOutputServer:
RFServerBase

Public Member Functions

 RFOutputServer (std::string conffile)
 
int Configure (NSMmsg *, NSMcontext *) override
 
int UnConfigure (NSMmsg *, NSMcontext *) override
 
int Start (NSMmsg *, NSMcontext *) override
 
int Stop (NSMmsg *, NSMcontext *) override
 
int Restart (NSMmsg *, NSMcontext *) override
 
void server ()
 
void cleanup ()
 
virtual int Pause (NSMmsg *, NSMcontext *)
 
virtual int Resume (NSMmsg *, NSMcontext *)
 
virtual int Status (NSMmsg *, NSMcontext *)
 
virtual void SetNodeInfo (RfNodeInfo *ptr)
 
virtual RfNodeInfoGetNodeInfo ()
 

Static Public Attributes

static RFServerBases_instance
 

Private Attributes

RFConfm_conf
 
RFSharedMemm_shm
 
RFProcessManagerm_proc
 
RFLogManagerm_log
 
RFFlowStatm_flow
 
RingBufferm_rbufin
 
RingBufferm_rbufout
 
int m_pid_receiver [MAXNODES]
 
int m_pid_basf2
 
int m_pid_sender
 
int m_nnodes
 
RfNodeInfom_nsmmem
 

Detailed Description

Definition at line 29 of file RFOutputServer.h.

Constructor & Destructor Documentation

◆ RFOutputServer()

RFOutputServer ( std::string  conffile)

Definition at line 25 of file RFOutputServer.cc.

26{
27 // 0. Initialize configuration manager
28 m_conf = new RFConf(conffile.c_str());
29 char* nodename = m_conf->getconf("collector", "nodename");
30 // char nodename[256];
31 // gethostname(nodename, sizeof(nodename));
32
33 // 1. Set execution directory
34 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/collector";
35
36 mkdir(execdir.c_str(), 0755);
37 chdir(execdir.c_str());
38
39 // 2. Initialize local shared memory
40 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
41 string(nodename);
42 m_shm = new RFSharedMem((char*)shmname.c_str());
43
44 // 3. Initialize RingBuffers
45 // char* rbufin = m_conf->getconf("collector", "ringbufin");
46 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
47 string(m_conf->getconf("collector", "ringbufin"));
48 int rbinsize = m_conf->getconfi("collector", "ringbufinsize");
49 m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
50 // char* rbufout = m_conf->getconf("collector", "ringbufout");
51 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
52 string(m_conf->getconf("collector", "ringbufout"));
53 int rboutsize = m_conf->getconfi("collector", "ringbufoutsize");
54 m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
55
56 // 4. Initialize process manager
57 m_proc = new RFProcessManager(nodename);
58
59 // 5. Initialize LogManager
60 m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
61
62 // 6. Initialize data flow monitor
63 m_flow = new RFFlowStat((char*)shmname.c_str());
64
65 // 7. Clear PID list
66 m_pid_sender = 0;
67 m_pid_basf2 = 0;
68 m_nnodes = m_conf->getconfi("processor", "nnodes");
69 for (int i = 0; i < m_nnodes; i++)
70 m_pid_receiver[i] = 0;
71
72}
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39

◆ ~RFOutputServer()

Definition at line 74 of file RFOutputServer.cc.

75{
76 delete m_log;
77 delete m_proc;
78 delete m_shm;
79 delete m_conf;
80 delete m_rbufin;
81 delete m_rbufout;
82 delete m_flow;
83}

Member Function Documentation

◆ cleanup()

void cleanup ( )

Definition at line 291 of file RFOutputServer.cc.

292{
293 printf("RFOutputServer : cleaning up\n");
294 UnConfigure(NULL, NULL);
295 printf("RFOutputServer: Done. Exitting\n");
296 exit(-1);
297}

◆ Configure()

int Configure ( NSMmsg nsmm,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 88 of file RFOutputServer.cc.

89{
90 // Start processes from down stream
91
92 // 0. Get common parameters
93 // char* rbufin = m_conf->getconf("collector", "ringbufin");
94 string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
95 string(m_conf->getconf("collector", "ringbufin"));
96 // char* rbufout = m_conf->getconf("collector", "ringbufout");
97 string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
98 string(m_conf->getconf("collector", "ringbufout"));
99
100 // char* shmname = m_conf->getconf("collector", "nodename");
101 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
102 string(m_conf->getconf("collector", "nodename"));
103
104 // 1. Histogram Receiver
105 // char* hrecv = m_conf->getconf("collector", "historecv", "script");
106 char* hport = m_conf->getconf("collector", "historecv", "port");
107 // char* mapfile = m_conf->getconf("collector", "historecv", "mapfile");
108 // m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
109
110 // 2. Histogram Relay
111 // char* hrelay = m_conf->getconf("collector", "historelay", "script");
112 // char* dqmdest = m_conf->getconf("dqmserver", "host");
113 // char* dqmport = m_conf->getconf("dqmserver", "port");
114 // char* interval = m_conf->getconf("collector", "historelay", "interval");
115 // m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
116
117 // 3. Run sender / logger
118 char* src = m_conf->getconf("collector", "destination");
119 if (strstr(src, "net") != 0) {
120 // Run sender
121 char* sender = m_conf->getconf("collector", "sender", "script");
122 char* port = m_conf->getconf("collector", "sender", "port");
123 char idbuf[3];
124 sprintf(idbuf, "%2.2d", RF_OUTPUT_ID);
125 m_pid_sender = m_proc->Execute(sender, (char*)rbufout.c_str(), port, (char*)shmname.c_str(), idbuf);
126 m_flow->clear(RF_OUTPUT_ID);
127 } else if (strstr(src, "file") != 0) {
128 // Run file writer
129 char* writer = m_conf->getconf("collector", "writer", "script");
130 char* file = m_conf->getconf("collector", "writer", "filename");
131 char* nnode = m_conf->getconf("processor", "nnodes");
132 m_pid_sender = m_proc->Execute(writer, (char*)rbufout.c_str(), file, nnode);
133 } else {
134 // Do not run anything
135 }
136
137 // 4. Run basf2
138 char* basf2 = m_conf->getconf("collector", "basf2", "script");
139 if (nsmm->len > 0) {
140 basf2 = (char*) nsmm->datap;
141 printf("Configure: basf2 script overridden : %s\n", basf2);
142 }
143 m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
144
145 // 5. Run receiver
146 m_nnodes = 0;
147 int maxnodes = m_conf->getconfi("processor", "nnodes");
148 int idbase = m_conf->getconfi("processor", "idbase");
149 // char* hostbase = m_conf->getconf("processor", "hostbase");
150 char* hostbase = m_conf->getconf("processor", "hostbase");
151 char* badlist = m_conf->getconf("processor", "badlist");
152 char* port = m_conf->getconf("processor", "sender", "port");
153
154 char* receiver = m_conf->getconf("collector", "receiver", "script");
155 char hostname[512], idname[3], shmid[3];
156 for (int i = 0; i < maxnodes; i++) {
157 sprintf(idname, "%2.2d", idbase + i);
158 sprintf(shmid, "%2.2d", i);
159 if (badlist == NULL ||
160 strstr(badlist, idname) == 0) {
161 sprintf(hostname, "%s%2.2d", hostbase, idbase + i);
162 m_pid_receiver[m_nnodes] = m_proc->Execute(receiver, (char*)rbufin.c_str(), hostname, port, (char*)shmname.c_str(), shmid);
163 m_flow->clear(i);
164 m_nnodes++;
165 }
166 }
167
168 m_rbufin->forceClear();
169 m_rbufout->forceClear();
170 return 0;
171}
int Execute(char *script, int nargs, char **args)
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Definition: RingBuffer.cc:441

◆ GetNodeInfo()

virtual RfNodeInfo * GetNodeInfo ( )
inlinevirtualinherited

Definition at line 38 of file RFServerBase.h.

39 {
40 return m_nsmmem;
41 };

◆ Pause()

virtual int Pause ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 28 of file RFServerBase.h.

28{ return 0; };

◆ Restart()

int Restart ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 231 of file RFOutputServer.cc.

232{
233 printf("RFOutputServer : Restarting!!!!!!\n");
234 /* Original Impl
235 kill(m_pid_sender, SIGINT);
236 kill(m_pid_basf2, SIGINT);
237 for (int i = 0; i < m_nnodes; i++) {
238 kill(m_pid_receiver[i], SIGINT);
239 }
240 // Simple Implementation
241 system("killall sock2rbr rb2sockr basf2 hrelay hserver");
242 fflush(stdout);
243 */
244 NSMmsg* nsmmsg = NULL;
245 NSMcontext* nsmcontext = NULL;
246 RFOutputServer::UnConfigure(nsmmsg, nsmcontext);
247 sleep(2);
248 RFOutputServer::Configure(nsmmsg, nsmcontext);
249 return 0;
250}
Definition: nsm2.h:224

◆ Resume()

virtual int Resume ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 29 of file RFServerBase.h.

29{ return 0; };

◆ server()

void server ( )

Definition at line 254 of file RFOutputServer.cc.

255{
256 m_flow->fillProcessStatus(GetNodeInfo());
257 while (true) {
258 int recv_id = 0;
259 pid_t pid = m_proc->CheckProcess();
260 if (pid > 0) {
261 printf("RFOutputServer : process dead. pid=%d\n", pid);
262 if (pid == m_pid_sender) {
263 m_log->Fatal("RFOutputServer : sender process dead. pid=%d\n", pid);
264 m_pid_sender = 0;
265 } else if (pid == m_pid_basf2) {
266 m_log->Fatal("RFOutputServer : basf2 process dead. pid=%d\n", pid);
267 m_pid_basf2 = 0;
268 } else {
269 for (int i = 0; i < m_nnodes; i++) {
270 if (pid == m_pid_receiver[i]) {
271 m_log->Fatal("RFOutputServer : receiver process %d dead. pid=%d\n", i, pid);
272 m_pid_receiver[i] = 0;
273 recv_id = i;
274 }
275 }
276 }
277 }
278
279 int st = m_proc->CheckOutput();
280 if (st < 0) {
281 perror("RFOutputServer::server");
282 // exit ( -1 );
283 } else if (st > 0) {
284 m_log->ProcessLog(m_proc->GetFd());
285 }
286 m_flow->fillNodeInfo(RF_OUTPUT_ID, GetNodeInfo(), true);
287 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver[recv_id], m_pid_sender,
288 m_pid_basf2);
289 }
290}

◆ SetNodeInfo()

virtual void SetNodeInfo ( RfNodeInfo ptr)
inlinevirtualinherited

Definition at line 33 of file RFServerBase.h.

34 {
35 m_nsmmem = ptr;
36 };

◆ Start()

int Start ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 217 of file RFOutputServer.cc.

218{
219 // Clear RingBuffer
220 m_rbufout->forceClear();
221 // m_rbufin->forceClear();
222 return 0;
223}

◆ Status()

virtual int Status ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 31 of file RFServerBase.h.

31{ return 0; };

◆ Stop()

int Stop ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 225 of file RFOutputServer.cc.

226{
227 return 0;
228}

◆ UnConfigure()

int UnConfigure ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 173 of file RFOutputServer.cc.

174{
175 // system("killall sock2rbr rb2sockr basf2 hrelay hserver");
176
177 printf("m_pid_sender = %d\n", m_pid_sender);
178 printf("m_pid_basf2 = %d\n", m_pid_basf2);
179 fflush(stdout);
180 int status, ws;
181 if (m_pid_sender != 0) {
182 printf("killing sender %d\n", m_pid_sender);
183 // kill(m_pid_sender, SIGINT);
184 kill(m_pid_sender, SIGINT);
185 ws = waitpid(m_pid_sender, &status, 0);
186 printf("wait return = %d, status = %d\n", ws, status);
187 }
188
189 if (m_pid_basf2 != 0) {
190 printf("killing sender %d\n", m_pid_sender);
191 kill(m_pid_basf2, SIGINT);
192 ws = waitpid(m_pid_basf2, &status, 0);
193 printf("wait return = %d, status = %d\n", ws, status);
194 }
195
196 for (int i = 0; i < m_nnodes; i++) {
197 if (m_pid_receiver[i] != 0) {
198 printf("killing receiver %d\n", m_pid_receiver[i]);
199 kill(m_pid_receiver[i], SIGINT);
200 ws = waitpid(m_pid_receiver[i], &status, 0);
201 printf("wait return = %d, status = %d\n", ws, status);
202 }
203 }
204
205 // Clear RingBuffer
206 m_rbufin->forceClear();
207 m_rbufout->forceClear();
208
209 // Clear process list
210 m_flow->fillProcessStatus(GetNodeInfo());
211
212 printf("Unconfigure done\n");
213 fflush(stdout);
214 return 0;
215}

Member Data Documentation

◆ m_conf

RFConf* m_conf
private

Definition at line 48 of file RFOutputServer.h.

◆ m_flow

RFFlowStat* m_flow
private

Definition at line 52 of file RFOutputServer.h.

◆ m_log

RFLogManager* m_log
private

Definition at line 51 of file RFOutputServer.h.

◆ m_nnodes

int m_nnodes
private

Definition at line 61 of file RFOutputServer.h.

◆ m_nsmmem

RfNodeInfo* m_nsmmem
privateinherited

Definition at line 47 of file RFServerBase.h.

◆ m_pid_basf2

int m_pid_basf2
private

Definition at line 57 of file RFOutputServer.h.

◆ m_pid_receiver

int m_pid_receiver[MAXNODES]
private

Definition at line 56 of file RFOutputServer.h.

◆ m_pid_sender

int m_pid_sender
private

Definition at line 58 of file RFOutputServer.h.

◆ m_proc

RFProcessManager* m_proc
private

Definition at line 50 of file RFOutputServer.h.

◆ m_rbufin

RingBuffer* m_rbufin
private

Definition at line 53 of file RFOutputServer.h.

◆ m_rbufout

RingBuffer* m_rbufout
private

Definition at line 54 of file RFOutputServer.h.

◆ m_shm

RFSharedMem* m_shm
private

Definition at line 49 of file RFOutputServer.h.

◆ s_instance

RFServerBase* s_instance
staticinherited

Definition at line 44 of file RFServerBase.h.


The documentation for this class was generated from the following files: