Belle II Software development
ERecoDistributor Class Reference
Inheritance diagram for ERecoDistributor:
RFServerBase

Public Member Functions

 ERecoDistributor (std::string conffile)
 
int Configure (NSMmsg *, NSMcontext *) override
 
int UnConfigure (NSMmsg *, NSMcontext *) override
 
int Start (NSMmsg *, NSMcontext *) override
 
int Stop (NSMmsg *, NSMcontext *) override
 
int Restart (NSMmsg *, NSMcontext *) override
 
void server ()
 
virtual int Pause (NSMmsg *, NSMcontext *)
 
virtual int Resume (NSMmsg *, NSMcontext *)
 
virtual int Status (NSMmsg *, NSMcontext *)
 
virtual void SetNodeInfo (RfNodeInfo *ptr)
 
virtual RfNodeInfoGetNodeInfo ()
 

Static Public Attributes

static RFServerBases_instance
 

Private Attributes

RFConfm_conf
 
RFSharedMemm_shm
 
RFProcessManagerm_proc
 
RFLogManagerm_log
 
RFFlowStatm_flow
 
RingBufferm_rbufin
 
int m_pid_recv [MAXNODES]
 
int m_pid_sender [MAXNODES]
 
int m_nnodes
 
int m_nrecv
 
RfNodeInfom_nsmmem
 

Detailed Description

Definition at line 29 of file ERecoDistributor.h.

Constructor & Destructor Documentation

◆ ERecoDistributor()

ERecoDistributor ( std::string  conffile)

Definition at line 25 of file ERecoDistributor.cc.

26{
27 // 0. Initialize configuration manager
28 m_conf = new RFConf(conffile.c_str());
29 char* nodename = m_conf->getconf("distributor", "nodename");
30 // char nodename[256];
31 // gethostname ( nodename, sizeof(nodename) );
32
33 // 1. Set execution directory
34 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/distributor";
35
36 mkdir(execdir.c_str(), 0755);
37 chdir(execdir.c_str());
38
39 // 2. Initialize local shared memory
40 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
41 string(m_conf->getconf("distributor", "nodename"));
42 m_shm = new RFSharedMem((char*)shmname.c_str());
43
44 // 3. Initialize process manager
45 m_proc = new RFProcessManager(nodename);
46
47 // 4. Initialize RingBuffers
48 // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
49 string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
50 string(m_conf->getconf("distributor", "ringbuffer"));
51 int rbufsize = m_conf->getconfi("distributor", "ringbuffersize");
52 m_rbufin = new RingBuffer(ringbuf.c_str(), rbufsize);
53
54 // 5. Initialize LogManager
55 m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
56
57 // 6. Initialize data flow monitor
58 m_flow = new RFFlowStat((char*)shmname.c_str());
59
60}
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39

◆ ~ERecoDistributor()

Definition at line 62 of file ERecoDistributor.cc.

63{
64 delete m_log;
65 delete m_proc;
66 delete m_shm;
67 delete m_conf;
68 delete m_flow;
69 delete m_rbufin;
70}

Member Function Documentation

◆ Configure()

int Configure ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 75 of file ERecoDistributor.cc.

76{
77 // 0. Global parameters
78 // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
79 string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
80 string(m_conf->getconf("distributor", "ringbuffer"));
81
82 // char* shmname = m_conf->getconf("distributor", "nodename");
83 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
84 string(m_conf->getconf("distributor", "nodename"));
85
86 // 1. Run sender
87 m_nnodes = 0;
88 int maxnodes = m_conf->getconfi("processor", "nnodes");
89 int idbase = m_conf->getconfi("processor", "idbase");
90 //char* hostbase = m_conf->getconf("processor", "hostbase");
91 char* badlist = m_conf->getconf("processor", "badlist");
92
93 char* sender = m_conf->getconf("distributor", "sender", "script");
94 int portbase = m_conf->getconfi("distributor", "sender", "portbase");
95
96
97 char hostname[512], idname[3], shmid[3];
98 for (int i = 0; i < maxnodes; i++) {
99 sprintf(idname, "%2.2d", idbase + i);
100 sprintf(shmid, "%2.2d", i);
101 if (badlist == NULL ||
102 strstr(badlist, idname) == 0) {
103 int port = (idbase + i) + portbase;
104 char portchar[256];
105 sprintf(portchar, "%d", port);
106 m_pid_sender[m_nnodes] = m_proc->Execute(sender, (char*)ringbuf.c_str(), portchar, (char*)shmname.c_str(), shmid);
107 printf("Running sender to %d\n", port);
108 fflush(stdout);
109 m_nnodes++;
110 }
111 }
112
113 // 2. Run receiver
114 m_nrecv = 0;
115 int maxrecv = m_conf->getconfi("distributor", "receiver", "nnodes");
116 int ridbase = m_conf->getconfi("distributor", "receiver", "idbase");
117 char* rhostbase = m_conf->getconf("distributor", "receiver", "hostbase");
118 char* rbadlist = m_conf->getconf("distributor", "receiver", "badlist");
119 char* port = m_conf->getconf("distributor", "receiver", "port");
120 char* receiver = m_conf->getconf("distributor", "receiver", "script");
121
122 // char hostname[512], idname[3], shmid[3];
123 for (int i = 0; i < maxrecv; i++) {
124 sprintf(hostname, "%s%2.2d", rhostbase, ridbase + i);
125 sprintf(idname, "%2.2d", ridbase + i);
126 sprintf(shmid, "%2.2d", i + 20);
127 if (rbadlist == NULL ||
128 strstr(rbadlist, hostname) == 0) {
129 printf("Running receiver for %s\n", hostname);
130 fflush(stdout);
131 m_pid_recv[m_nrecv] = m_proc->Execute(receiver, (char*)ringbuf.c_str(), hostname, port, (char*)shmname.c_str(), shmid);
132 m_nrecv++;
133 }
134 }
135
136 printf("ERecoDistributor : Configure done\n");
137 return 0;
138}
int Execute(char *script, int nargs, char **args)

◆ GetNodeInfo()

virtual RfNodeInfo * GetNodeInfo ( )
inlinevirtualinherited

Definition at line 38 of file RFServerBase.h.

39 {
40 return m_nsmmem;
41 };

◆ Pause()

virtual int Pause ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 28 of file RFServerBase.h.

28{ return 0; };

◆ Restart()

int Restart ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 186 of file ERecoDistributor.cc.

187{
188 printf("ERecoDistributor : Restarting!!!!!\n");
189 NSMmsg* nsmmsg = NULL;
190 NSMcontext* nsmcontext = NULL;
191 ERecoDistributor::UnConfigure(nsmmsg, nsmcontext);
192 sleep(2);
193 ERecoDistributor::Configure(nsmmsg, nsmcontext);
194 return 0;
195}
Definition: nsm2.h:224

◆ Resume()

virtual int Resume ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 29 of file RFServerBase.h.

29{ return 0; };

◆ server()

void server ( )

Definition at line 199 of file ERecoDistributor.cc.

200{
201 // int nevt = 0;
202 m_flow->fillProcessStatus(GetNodeInfo());
203
204 while (true) {
205 pid_t pid = m_proc->CheckProcess();
206 if (pid > 0) {
207 printf("ERecoDistributor : process dead. pid = %d\n", pid);
208 for (int i = 0; i < m_nrecv; i++) {
209 if (pid == m_pid_recv[i]) {
210 m_log->Fatal("ERecoDistributor : receiver process (%d) dead. pid=%d\n", i, pid);
211 m_pid_recv[i] = 0;
212 }
213 }
214 for (int i = 0; i < m_nnodes; i++) {
215 if (pid == m_pid_sender[i]) {
216 m_log->Fatal("ERecoDistributor : sender process (%d) dead. pid=%d\n", i, m_pid_sender[i]);
217 m_pid_sender[i] = 0;
218 }
219 }
220 }
221
222 int st = m_proc->CheckOutput();
223 if (st < 0) {
224 perror("ERecoDistributor::server");
225 // exit ( -1 );
226 } else if (st > 0) {
227 m_log->ProcessLog(m_proc->GetFd());
228 }
229 }
230 // m_flow->fillNodeInfo(RF_INPUT_ID, GetNodeInfo(), false);
231 // m_flow->fillProcessStatus(GetNodeInfo(), m_pid_recv, m_pid_sender[sender_id]);
232 // m_flow->fillNodeInfo(0, GetNodeInfo(), false);
233 // Debug
234 // RfNodeInfo* info = GetNodeInfo();
235 //info->nevent_in = nevt++;
236 // info->nqueue_in = nevt;
237 // printf ( "FillNodeInfo called!! info->nevent_in = %d\n", info->nevent_in );
238}

◆ SetNodeInfo()

virtual void SetNodeInfo ( RfNodeInfo ptr)
inlinevirtualinherited

Definition at line 33 of file RFServerBase.h.

34 {
35 m_nsmmem = ptr;
36 };

◆ Start()

int Start ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 173 of file ERecoDistributor.cc.

174{
175 m_rbufin->clear();
176 return 0;
177}
int clear()
Clear the RingBuffer.
Definition: RingBuffer.cc:426

◆ Status()

virtual int Status ( NSMmsg ,
NSMcontext  
)
inlinevirtualinherited

Definition at line 31 of file RFServerBase.h.

31{ return 0; };

◆ Stop()

int Stop ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 179 of file ERecoDistributor.cc.

180{
181 // m_rbufin->clear();
182 return 0;
183}

◆ UnConfigure()

int UnConfigure ( NSMmsg ,
NSMcontext  
)
overridevirtual

Reimplemented from RFServerBase.

Definition at line 140 of file ERecoDistributor.cc.

141{
142 // system("killall sock2rbr rb2sockr");
143 int status;
144
145 printf("m_nrecv = %d\n", m_nrecv);
146 for (int i = 0; i < m_nrecv; i++) {
147 if (m_pid_recv[i] != 0) {
148 printf("ERecoDistributor : killing receiver pid=%d\n", m_pid_recv[i]);
149 kill(m_pid_recv[i], SIGKILL);
150 waitpid(m_pid_recv[i], &status, 0);
151 }
152 }
153
154 printf("m_nnodes = %d\n", m_nnodes);
155 for (int i = 0; i < m_nnodes; i++) {
156 if (m_pid_sender[i] != 0) {
157 printf("ERecoDistributor : killing sender pid=%d\n", m_pid_sender[i]);
158 // kill(m_pid_sender[i], SIGINT);
159 kill(m_pid_sender[i], SIGKILL);
160 waitpid(m_pid_sender[i], &status, 0);
161 }
162 }
163 // Clear RingBuffer
164 m_rbufin->forceClear();
165
166 // Clear process list
167 m_flow->fillProcessStatus(GetNodeInfo());
168
169 printf("ERecoDistributor : Unconfigure done\n");
170 return 0;
171}
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Definition: RingBuffer.cc:441

Member Data Documentation

◆ m_conf

RFConf* m_conf
private

Definition at line 45 of file ERecoDistributor.h.

◆ m_flow

RFFlowStat* m_flow
private

Definition at line 49 of file ERecoDistributor.h.

◆ m_log

RFLogManager* m_log
private

Definition at line 48 of file ERecoDistributor.h.

◆ m_nnodes

int m_nnodes
private

Definition at line 54 of file ERecoDistributor.h.

◆ m_nrecv

int m_nrecv
private

Definition at line 55 of file ERecoDistributor.h.

◆ m_nsmmem

RfNodeInfo* m_nsmmem
privateinherited

Definition at line 47 of file RFServerBase.h.

◆ m_pid_recv

int m_pid_recv[MAXNODES]
private

Definition at line 52 of file ERecoDistributor.h.

◆ m_pid_sender

int m_pid_sender[MAXNODES]
private

Definition at line 53 of file ERecoDistributor.h.

◆ m_proc

RFProcessManager* m_proc
private

Definition at line 47 of file ERecoDistributor.h.

◆ m_rbufin

RingBuffer* m_rbufin
private

Definition at line 50 of file ERecoDistributor.h.

◆ m_shm

RFSharedMem* m_shm
private

Definition at line 46 of file ERecoDistributor.h.

◆ s_instance

RFServerBase* s_instance
staticinherited

Definition at line 44 of file RFServerBase.h.


The documentation for this class was generated from the following files: