Belle II Software  release-05-01-25
RFOutputServer.cc
1 //+
2 // File : RFOutputSever.cc
3 // Description : Collect outputs from worker node and send them to EVB2
4 // w/ branch to PXD
5 //
6 // Author : Ryosuke Itoh, IPNS, KEK
7 // Date : 24 - June - 2013
8 //-
9 
10 #include "daq/rfarm/manager/RFOutputServer.h"
11 
12 #include <sys/stat.h>
13 #include <sys/types.h>
14 #include <sys/wait.h>
15 
16 #include <unistd.h>
17 
18 #include <csignal>
19 #include <cstring>
20 
21 #define RFOTSOUT stdout
22 
23 using namespace std;
24 using namespace Belle2;
25 
26 RFOutputServer::RFOutputServer(string conffile)
27 {
28  // 0. Initialize configuration manager
29  m_conf = new RFConf(conffile.c_str());
30  char* nodename = m_conf->getconf("collector", "nodename");
31  // char nodename[256];
32  // gethostname(nodename, sizeof(nodename));
33 
34  // 1. Set execution directory
35  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/collector";
36 
37  mkdir(execdir.c_str(), 0755);
38  chdir(execdir.c_str());
39 
40  // 2. Initialize local shared memory
41  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
42  string(nodename);
43  m_shm = new RFSharedMem((char*)shmname.c_str());
44 
45  // 3. Initialize RingBuffers
46  // char* rbufin = m_conf->getconf("collector", "ringbufin");
47  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
48  string(m_conf->getconf("collector", "ringbufin"));
49  int rbinsize = m_conf->getconfi("collector", "ringbufinsize");
50  m_rbufin = new RingBuffer(rbufin.c_str(), rbinsize);
51  // char* rbufout = m_conf->getconf("collector", "ringbufout");
52  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
53  string(m_conf->getconf("collector", "ringbufout"));
54  int rboutsize = m_conf->getconfi("collector", "ringbufoutsize");
55  m_rbufout = new RingBuffer(rbufout.c_str(), rboutsize);
56 
57  // 4. Initialize process manager
58  m_proc = new RFProcessManager(nodename);
59 
60  // 5. Initialize LogManager
61  m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
62 
63  // 6. Initialize data flow monitor
64  m_flow = new RFFlowStat((char*)shmname.c_str());
65 
66  // 7. Clear PID list
67  m_pid_sender = 0;
68  m_pid_basf2 = 0;
69  m_nnodes = m_conf->getconfi("processor", "nnodes");
70  for (int i = 0; i < m_nnodes; i++)
71  m_pid_receiver[i] = 0;
72 
73 }
74 
75 RFOutputServer::~RFOutputServer()
76 {
77  delete m_log;
78  delete m_proc;
79  delete m_shm;
80  delete m_conf;
81  delete m_rbufin;
82  delete m_rbufout;
83  delete m_flow;
84 }
85 
86 
87 // Functions hooked up by NSM2
88 
89 int RFOutputServer::Configure(NSMmsg* nsmm, NSMcontext* nsmc)
90 {
91  // Start processes from down stream
92 
93  // 0. Get common parameters
94  // char* rbufin = m_conf->getconf("collector", "ringbufin");
95  string rbufin = string(m_conf->getconf("system", "unitname")) + ":" +
96  string(m_conf->getconf("collector", "ringbufin"));
97  // char* rbufout = m_conf->getconf("collector", "ringbufout");
98  string rbufout = string(m_conf->getconf("system", "unitname")) + ":" +
99  string(m_conf->getconf("collector", "ringbufout"));
100 
101  // char* shmname = m_conf->getconf("collector", "nodename");
102  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
103  string(m_conf->getconf("collector", "nodename"));
104 
105  // 1. Histogram Receiver
106  char* hrecv = m_conf->getconf("collector", "historecv", "script");
107  char* hport = m_conf->getconf("collector", "historecv", "port");
108  char* mapfile = m_conf->getconf("collector", "historecv", "mapfile");
109  // m_pid_hrecv = m_proc->Execute(hrecv, hport, mapfile);
110 
111  // 2. Histogram Relay
112  char* hrelay = m_conf->getconf("collector", "historelay", "script");
113  char* dqmdest = m_conf->getconf("dqmserver", "host");
114  char* dqmport = m_conf->getconf("dqmserver", "port");
115  char* interval = m_conf->getconf("collector", "historelay", "interval");
116  // m_pid_hrelay = m_proc->Execute(hrelay, mapfile, dqmdest, dqmport, interval);
117 
118  // 3. Run sender / logger
119  char* src = m_conf->getconf("collector", "destination");
120  if (strstr(src, "net") != 0) {
121  // Run sender
122  char* sender = m_conf->getconf("collector", "sender", "script");
123  char* port = m_conf->getconf("collector", "sender", "port");
124  char idbuf[3];
125  sprintf(idbuf, "%2.2d", RF_OUTPUT_ID);
126  m_pid_sender = m_proc->Execute(sender, (char*)rbufout.c_str(), port, (char*)shmname.c_str(), idbuf);
127  m_flow->clear(RF_OUTPUT_ID);
128  } else if (strstr(src, "file") != 0) {
129  // Run file writer
130  char* writer = m_conf->getconf("collector", "writer", "script");
131  char* file = m_conf->getconf("collector", "writer", "filename");
132  char* nnode = m_conf->getconf("processor", "nnodes");
133  m_pid_sender = m_proc->Execute(writer, (char*)rbufout.c_str(), file, nnode);
134  } else {
135  // Do not run anything
136  }
137 
138  // 4. Run basf2
139  char* basf2 = m_conf->getconf("collector", "basf2", "script");
140  if (nsmm->len > 0) {
141  basf2 = (char*) nsmm->datap;
142  printf("Configure: basf2 script overridden : %s\n", basf2);
143  }
144  m_pid_basf2 = m_proc->Execute(basf2, (char*)rbufin.c_str(), (char*)rbufout.c_str(), hport);
145 
146  // 5. Run receiver
147  m_nnodes = 0;
148  int maxnodes = m_conf->getconfi("processor", "nnodes");
149  int idbase = m_conf->getconfi("processor", "idbase");
150  // char* hostbase = m_conf->getconf("processor", "hostbase");
151  char* hostbase = m_conf->getconf("processor", "hostbase");
152  char* badlist = m_conf->getconf("processor", "badlist");
153  char* port = m_conf->getconf("processor", "sender", "port");
154 
155  char* receiver = m_conf->getconf("collector", "receiver", "script");
156  char hostname[512], idname[3], shmid[3];
157  for (int i = 0; i < maxnodes; i++) {
158  sprintf(idname, "%2.2d", idbase + i);
159  sprintf(shmid, "%2.2d", i);
160  if (badlist == NULL ||
161  strstr(badlist, idname) == 0) {
162  sprintf(hostname, "%s%2.2d", hostbase, idbase + i);
163  m_pid_receiver[m_nnodes] = m_proc->Execute(receiver, (char*)rbufin.c_str(), hostname, port, (char*)shmname.c_str(), shmid);
164  m_flow->clear(i);
165  m_nnodes++;
166  }
167  }
168 
169  m_rbufin->forceClear();
170  m_rbufout->forceClear();
171  return 0;
172 }
173 
174 int RFOutputServer::UnConfigure(NSMmsg*, NSMcontext*)
175 {
176  // system("killall sock2rbr rb2sockr basf2 hrelay hserver");
177 
178  printf("m_pid_sender = %d\n", m_pid_sender);
179  printf("m_pid_basf2 = %d\n", m_pid_basf2);
180  fflush(stdout);
181  int status, ws;
182  if (m_pid_sender != 0) {
183  printf("killing sender %d\n", m_pid_sender);
184  // kill(m_pid_sender, SIGINT);
185  kill(m_pid_sender, SIGINT);
186  ws = waitpid(m_pid_sender, &status, 0);
187  printf("wait return = %d, status = %d\n", ws, status);
188  }
189 
190  if (m_pid_basf2 != 0) {
191  printf("killing sender %d\n", m_pid_sender);
192  kill(m_pid_basf2, SIGINT);
193  ws = waitpid(m_pid_basf2, &status, 0);
194  printf("wait return = %d, status = %d\n", ws, status);
195  }
196 
197  for (int i = 0; i < m_nnodes; i++) {
198  if (m_pid_receiver[i] != 0) {
199  printf("killing receiver %d\n", m_pid_receiver[i]);
200  kill(m_pid_receiver[i], SIGINT);
201  ws = waitpid(m_pid_receiver[i], &status, 0);
202  printf("wait return = %d, status = %d\n", ws, status);
203  }
204  }
205 
206  // Clear RingBuffer
207  m_rbufin->forceClear();
208  m_rbufout->forceClear();
209 
210  // Clear process list
211  m_flow->fillProcessStatus(GetNodeInfo());
212 
213  printf("Unconfigure done\n");
214  fflush(stdout);
215  return 0;
216 }
217 
218 int RFOutputServer::Start(NSMmsg*, NSMcontext*)
219 {
220  // Clear RingBuffer
221  m_rbufout->forceClear();
222  // m_rbufin->forceClear();
223  return 0;
224 }
225 
226 int RFOutputServer::Stop(NSMmsg*, NSMcontext*)
227 {
228  return 0;
229 }
230 
231 
232 int RFOutputServer::Restart(NSMmsg*, NSMcontext*)
233 {
234  printf("RFOutputServer : Restarting!!!!!!\n");
235  /* Original Impl
236  kill(m_pid_sender, SIGINT);
237  kill(m_pid_basf2, SIGINT);
238  for (int i = 0; i < m_nnodes; i++) {
239  kill(m_pid_receiver[i], SIGINT);
240  }
241  // Simple Implementation
242  system("killall sock2rbr rb2sockr basf2 hrelay hserver");
243  fflush(stdout);
244  */
245  NSMmsg* nsmmsg = NULL;
246  NSMcontext* nsmcontext = NULL;
247  RFOutputServer::UnConfigure(nsmmsg, nsmcontext);
248  sleep(2);
249  RFOutputServer::Configure(nsmmsg, nsmcontext);
250  return 0;
251 }
252 
253 // Server function
254 
255 void RFOutputServer::server()
256 {
257  m_flow->fillProcessStatus(GetNodeInfo());
258  while (true) {
259  int recv_id = 0;
260  pid_t pid = m_proc->CheckProcess();
261  if (pid > 0) {
262  printf("RFOutputServer : process dead. pid=%d\n", pid);
263  if (pid == m_pid_sender) {
264  m_log->Fatal("RFOutputServer : sender process dead. pid=%d\n", pid);
265  m_pid_sender = 0;
266  } else if (pid == m_pid_basf2) {
267  m_log->Fatal("RFOutputServer : basf2 process dead. pid=%d\n", pid);
268  m_pid_basf2 = 0;
269  } else {
270  for (int i = 0; i < m_nnodes; i++) {
271  if (pid == m_pid_receiver[i]) {
272  m_log->Fatal("RFOutputServer : receiver process %d dead. pid=%d\n", i, pid);
273  m_pid_receiver[i] = 0;
274  recv_id = i;
275  }
276  }
277  }
278  }
279 
280  int st = m_proc->CheckOutput();
281  if (st < 0) {
282  perror("RFOutputServer::server");
283  // exit ( -1 );
284  } else if (st > 0) {
285  m_log->ProcessLog(m_proc->GetFd());
286  }
287  m_flow->fillNodeInfo(RF_OUTPUT_ID, GetNodeInfo(), true);
288  m_flow->fillProcessStatus(GetNodeInfo(), m_pid_receiver[recv_id], m_pid_sender,
289  m_pid_basf2);
290  }
291 }
292 void RFOutputServer::cleanup()
293 {
294  printf("RFOutputServer : cleaning up\n");
295  UnConfigure(NULL, NULL);
296  printf("RFOutputServer: Done. Exitting\n");
297  exit(-1);
298 }
299 
300 
301 
NSMmsg
Definition: nsm2.h:217
Belle2::RFFlowStat
Definition: RFFlowStat.h:28
Belle2::RFSharedMem
Definition: RFSharedMem.h:51
Belle2::RFLogManager
Definition: RFLogManager.h:18
Belle2::RingBuffer
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:36
Belle2::RFProcessManager
Definition: RFProcessManager.h:22
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::RFConf
Definition: RFConf.h:24
NSMcontext_struct
Definition: nsmlib2.h:66