Belle II Software development
RFEventServer Class Reference
Inheritance diagram for RFEventServer:
RFServerBase

Public Member Functions

 RFEventServer (std::string conffile)
 
int Configure (NSMmsg *, NSMcontext *) override
 
int UnConfigure (NSMmsg *, NSMcontext *) override
 
int Start (NSMmsg *, NSMcontext *) override
 
int Stop (NSMmsg *, NSMcontext *) override
 
int Restart (NSMmsg *, NSMcontext *) override
 
void server ()
 
void cleanup ()
 
virtual int Pause (NSMmsg *, NSMcontext *)
 
virtual int Resume (NSMmsg *, NSMcontext *)
 
virtual int Status (NSMmsg *, NSMcontext *)
 
virtual void SetNodeInfo (RfNodeInfo *ptr)
 
virtual RfNodeInfoGetNodeInfo ()
 

Static Public Member Functions

static RFEventServerCreate (const std::string &conffile)
 
static RFEventServerInstance ()
 

Private Attributes

RFConfm_conf
 
RFSharedMemm_shm
 
RFProcessManagerm_proc
 
RFLogManagerm_log
 
RFFlowStatm_flow
 
RingBufferm_rbufin
 
int m_pid_recv
 
int m_pid_sender [MAXNODES]
 
int m_nnodes
 
RfNodeInfom_nsmmem
 

Static Private Attributes

static RFEventServers_instance = 0
 

Detailed Description

Definition at line 29 of file RFEventServer.h.

Constructor & Destructor Documentation

◆ RFEventServer()

RFEventServer ( std::string  conffile)

Definition at line 28 of file RFEventServer.cc.

29{
30 // 0. Initialize configuration manager
31 m_conf = new RFConf(conffile.c_str());
32 char* nodename = m_conf->getconf("distributor", "nodename");
33 // char nodename[256];
34 // gethostname ( nodename, sizeof(nodename) );
35
36 // 1. Set execution directory
37 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/distributor";
38
39 mkdir(execdir.c_str(), 0755);
40 chdir(execdir.c_str());
41
42 // 2. Initialize local shared memory
43 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
44 string(m_conf->getconf("distributor", "nodename"));
45 m_shm = new RFSharedMem((char*)shmname.c_str());
46
47 // 3. Initialize process manager
48 m_proc = new RFProcessManager(nodename);
49
50 // 4. Initialize RingBuffers
51 // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
52 string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
53 string(m_conf->getconf("distributor", "ringbuffer"));
54 int rbufsize = m_conf->getconfi("distributor", "ringbuffersize");
55 m_rbufin = new RingBuffer(ringbuf.c_str(), rbufsize);
56
57 // 5. Initialize LogManager
58 m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
59
60 // 6. Initialize data flow monitor
61 m_flow = new RFFlowStat((char*)shmname.c_str());
62
63 // 7. Clear PID list
64 m_pid_recv = 0;
65 for (int i = 0; i < m_nnodes; i++)
66 m_pid_sender[i] = 0 ;
67
68}
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39

◆ ~RFEventServer()

Definition at line 70 of file RFEventServer.cc.

71{
72 delete m_log;
73 delete m_proc;
74 delete m_shm;
75 delete m_conf;
76 delete m_flow;
77 delete m_rbufin;
78}

Member Function Documentation

◆ cleanup()

void cleanup ( )

Definition at line 268 of file RFEventServer.cc.

269{
270 printf("RFEventServer : cleaning up\n");
271 UnConfigure(NULL, NULL);
272 /*
273 kill ( m_pid_recv, SIGINT );
274 int status;
275 waitpid ( m_pid_recv, &status, 0 );
276 printf ( "RFEventServer : receiver terminated.\n" );
277 for ( int i=0; i<m_nnodes; i++ ) {
278 kill ( m_pid_sender[i], SIGINT );
279 waitpid ( m_pid_sender[i], &status, 0 );
280 printf ( "RFEventServer : sender [%d] terminated.\n", i );
281 }
282 */
283 printf("RFEventServer: Done. Exitting\n");
284 exit(-1);
285}

◆ Configure()

int Configure ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 97 of file RFEventServer.cc.

98{
99 // 0. Global parameters
100 // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
101 string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
102 string(m_conf->getconf("distributor", "ringbuffer"));
103
104 // char* shmname = m_conf->getconf("distributor", "nodename");
105 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
106 string(m_conf->getconf("distributor", "nodename"));
107
108 // 2. Run sender
109 m_nnodes = 0;
110 int maxnodes = m_conf->getconfi("processor", "nnodes");
111 int idbase = m_conf->getconfi("processor", "idbase");
112 // char* hostbase = m_conf->getconf("processor", "hostbase");
113 char* badlist = m_conf->getconf("processor", "badlist");
114
115 char* sender = m_conf->getconf("distributor", "sender", "script");
116 int portbase = m_conf->getconfi("distributor", "sender", "portbase");
117
118 char idname[3], shmid[3];
119 for (int i = 0; i < maxnodes; i++) {
120 sprintf(idname, "%2.2d", idbase + i);
121 sprintf(shmid, "%2.2d", i);
122 if (badlist == NULL ||
123 strstr(badlist, idname) == 0) {
124 int port = (idbase + i) + portbase;
125 char portchar[256];
126 sprintf(portchar, "%d", port);
127 m_pid_sender[m_nnodes] = m_proc->Execute(sender, (char*)ringbuf.c_str(), portchar, (char*)shmname.c_str(), shmid);
128 m_flow->clear(i);
129 m_nnodes++;
130 }
131 }
132
133 // 1. Run receiver
134 char* srcG = m_conf->getconf("distributor", "source");
135 if (strstr(srcG, "net") != 0) {
136 // Run receiver
137 char* receiver = m_conf->getconf("distributor", "receiver", "script");
138 char* src = m_conf->getconf("distributor", "receiver", "host");
139 char* port = m_conf->getconf("distributor", "receiver", "port");
140 // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
141 char idbuf[3];
142 sprintf(idbuf, "%2.2d", RF_INPUT_ID);
143 m_pid_recv = m_proc->Execute(receiver, (char*)ringbuf.c_str(), src, port, (char*)shmname.c_str(), idbuf);
144 m_flow->clear(RF_INPUT_ID);
145 } else if (strstr(srcG, "file") != 0) {
146 // Run file reader
147 char* filein = m_conf->getconf("distributor", "fileinput", "script");
148 char* file = m_conf->getconf("distributor", "fileinput", "filename");
149 // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
150 char* nnodechr = m_conf->getconf("distributor", "nnodes");
151 m_pid_recv = m_proc->Execute(filein, (char*)ringbuf.c_str(), file, nnodechr);
152 }
153
154 m_rbufin->forceClear();
155
156 // else none
157 return 0;
158}
int Execute(char *script, int nargs, char **args)
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Definition: RingBuffer.cc:441

◆ Create()

RFEventServer & Create ( const std::string &  conffile)
static

Definition at line 81 of file RFEventServer.cc.

82{
83 if (!s_instance) {
84 s_instance = new RFEventServer(conffile);
85 }
86 return *s_instance;
87}

◆ GetNodeInfo()

virtual RfNodeInfo * GetNodeInfo ( )
inlinevirtualinherited

Definition at line 38 of file RFServerBase.h.

39 {
40 return m_nsmmem;
41 };

◆ Instance()

RFEventServer & Instance ( )
static

Definition at line 89 of file RFEventServer.cc.

90{
91 return *s_instance;
92}

◆ Pause()

virtual int Pause ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 28 of file RFServerBase.h.

28{ return 0; };

◆ Restart()

int Restart ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 200 of file RFEventServer.cc.

201{
202 printf("RFEventServer : Restarting!!!!!\n");
203 /* Original impl.
204 if (m_pid_recv != 0) {
205 kill(m_pid_recv, SIGINT);
206 }
207 for (int i = 0; i < m_nnodes; i++) {
208 if (m_pid_sender[i] != 0) {
209 printf("RFEventServer:: killing sender pid=%d\n", m_pid_sender[i]);
210 kill(m_pid_sender[i], SIGINT);
211 }
212 }
213 // Simple implementation
214 system("killall sock2rbr rb2sockr");
215 fflush(stdout);
216 */
217 NSMmsg* nsmmsg = NULL;
218 NSMcontext* nsmcontext = NULL;
219 RFEventServer::UnConfigure(nsmmsg, nsmcontext);
220 sleep(2);
221 RFEventServer::Configure(nsmmsg, nsmcontext);
222 return 0;
223}
Definition: nsm2.h:224

◆ Resume()

virtual int Resume ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 29 of file RFServerBase.h.

29{ return 0; };

◆ server()

void server ( )

Definition at line 227 of file RFEventServer.cc.

228{
229 // int nevt = 0;
230 m_flow->fillProcessStatus(GetNodeInfo());
231
232 while (true) {
233 int sender_id = 0;
234 pid_t pid = m_proc->CheckProcess();
235 if (pid > 0) {
236 printf("RFEventServer : process dead. pid = %d\n", pid);
237 if (pid == m_pid_recv) {
238 m_log->Fatal("RFEventServer : receiver process dead. pid=%d\n", pid);
239 m_pid_recv = 0;
240 } else {
241 for (int i = 0; i < m_nnodes; i++) {
242 if (pid == m_pid_sender[i]) {
243 m_log->Fatal("RFEventServer : sender process (%d) dead. pid=%d\n", i, m_pid_sender[i]);
244 m_pid_sender[i] = 0;
245 sender_id = i;
246 }
247 }
248 }
249 }
250 int st = m_proc->CheckOutput();
251 if (st < 0) {
252 perror("RFEventServer::server");
253 // exit ( -1 );
254 } else if (st > 0) {
255 m_log->ProcessLog(m_proc->GetFd());
256 }
257 m_flow->fillNodeInfo(RF_INPUT_ID, GetNodeInfo(), false);
258 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_recv, m_pid_sender[sender_id]);
259 // m_flow->fillNodeInfo(0, GetNodeInfo(), false);
260 // Debug
261 // RfNodeInfo* info = GetNodeInfo();
262 //info->nevent_in = nevt++;
263 // info->nqueue_in = nevt;
264 // printf ( "FillNodeInfo called!! info->nevent_in = %d\n", info->nevent_in );
265 }
266}

◆ SetNodeInfo()

virtual void SetNodeInfo ( RfNodeInfo ptr)
inlinevirtualinherited

Definition at line 33 of file RFServerBase.h.

34 {
35 m_nsmmem = ptr;
36 };

◆ Start()

int Start ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 186 of file RFEventServer.cc.

187{
188 // m_rbufin->clear();
189 // m_rbufin->forceClear();
190 return 0;
191}

◆ Status()

virtual int Status ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 31 of file RFServerBase.h.

31{ return 0; };

◆ Stop()

int Stop ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 193 of file RFEventServer.cc.

194{
195 m_rbufin->clear();
196 return 0;
197}
int clear()
Clear the RingBuffer.
Definition: RingBuffer.cc:426

◆ UnConfigure()

int UnConfigure ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 160 of file RFEventServer.cc.

161{
162 // system("killall sock2rbr rb2sockr");
163 int status;
164 if (m_pid_recv != 0) {
165 kill(m_pid_recv, SIGINT);
166 waitpid(m_pid_recv, &status, 0);
167 }
168 for (int i = 0; i < m_nnodes; i++) {
169 if (m_pid_sender[i] != 0) {
170 printf("RFEventServer:: killing sender pid=%d\n", m_pid_sender[i]);
171 // kill(m_pid_sender[i], SIGINT);
172 kill(m_pid_sender[i], SIGINT);
173 waitpid(m_pid_sender[i], &status, 0);
174 }
175 }
176 // Clear RingBuffer
177 m_rbufin->forceClear();
178
179 // Clear process list
180 m_flow->fillProcessStatus(GetNodeInfo());
181
182 printf("Unconfigure : done\n");
183 return 0;
184}

Member Data Documentation

◆ m_conf

RFConf* m_conf
private

Definition at line 52 of file RFEventServer.h.

◆ m_flow

RFFlowStat* m_flow
private

Definition at line 56 of file RFEventServer.h.

◆ m_log

RFLogManager* m_log
private

Definition at line 55 of file RFEventServer.h.

◆ m_nnodes

int m_nnodes
private

Definition at line 61 of file RFEventServer.h.

◆ m_nsmmem

RfNodeInfo* m_nsmmem
privateinherited

Definition at line 47 of file RFServerBase.h.

◆ m_pid_recv

int m_pid_recv
private

Definition at line 59 of file RFEventServer.h.

◆ m_pid_sender

int m_pid_sender[MAXNODES]
private

Definition at line 60 of file RFEventServer.h.

◆ m_proc

RFProcessManager* m_proc
private

Definition at line 54 of file RFEventServer.h.

◆ m_rbufin

RingBuffer* m_rbufin
private

Definition at line 57 of file RFEventServer.h.

◆ m_shm

RFSharedMem* m_shm
private

Definition at line 53 of file RFEventServer.h.

◆ s_instance

RFEventServer * s_instance = 0
staticprivate

Definition at line 64 of file RFEventServer.h.


The documentation for this class was generated from the following files: