Belle II Software  release-05-01-25
RFEventServer.cc
1 //+
2 // File : RFEventSever.cc
3 // Description : Receive events from EVB1 and distribute them to
4 // processing nodes
5 //
6 // Author : Ryosuke Itoh, IPNS, KEK
7 // Date : 24 - June - 2013
8 //-
9 
10 #include "daq/rfarm/manager/RFEventServer.h"
11 
12 #include <sys/stat.h>
13 #include <sys/types.h>
14 #include <sys/wait.h>
15 #include <unistd.h>
16 
17 #include <csignal>
18 #include <cstring>
19 #include <iostream>
20 
21 #define RFEVSOUT stdout
22 
23 using namespace std;
24 using namespace Belle2;
25 
26 RFEventServer* RFEventServer::s_instance = 0;
27 //RFServerBase* RFServerBase::s_instance = 0;
28 
29 RFEventServer::RFEventServer(string conffile)
30 {
31  // 0. Initialize configuration manager
32  m_conf = new RFConf(conffile.c_str());
33  char* nodename = m_conf->getconf("distributor", "nodename");
34  // char nodename[256];
35  // gethostname ( nodename, sizeof(nodename) );
36 
37  // 1. Set execution directory
38  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/distributor";
39 
40  mkdir(execdir.c_str(), 0755);
41  chdir(execdir.c_str());
42 
43  // 2. Initialize local shared memory
44  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
45  string(m_conf->getconf("distributor", "nodename"));
46  m_shm = new RFSharedMem((char*)shmname.c_str());
47 
48  // 3. Initialize process manager
49  m_proc = new RFProcessManager(nodename);
50 
51  // 4. Initialize RingBuffers
52  // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
53  string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
54  string(m_conf->getconf("distributor", "ringbuffer"));
55  int rbufsize = m_conf->getconfi("distributor", "ringbuffersize");
56  m_rbufin = new RingBuffer(ringbuf.c_str(), rbufsize);
57 
58  // 5. Initialize LogManager
59  m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
60 
61  // 6. Initialize data flow monitor
62  m_flow = new RFFlowStat((char*)shmname.c_str());
63 
64  // 7. Clear PID list
65  m_pid_recv = 0;
66  for (int i = 0; i < m_nnodes; i++)
67  m_pid_sender[i] = 0 ;
68 
69 }
70 
71 RFEventServer::~RFEventServer()
72 {
73  delete m_log;
74  delete m_proc;
75  delete m_shm;
76  delete m_conf;
77  delete m_flow;
78  delete m_rbufin;
79 }
80 
81 // Access to Singleton
82 RFEventServer& RFEventServer::Create(string conffile)
83 {
84  if (!s_instance) {
85  s_instance = new RFEventServer(conffile);
86  }
87  return *s_instance;
88 }
89 
90 RFEventServer& RFEventServer::Instance()
91 {
92  return *s_instance;
93 }
94 
95 
96 // Functions hooked up by NSM2
97 
98 int RFEventServer::Configure(NSMmsg*, NSMcontext*)
99 {
100  // 0. Global parameters
101  // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
102  string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
103  string(m_conf->getconf("distributor", "ringbuffer"));
104 
105  // char* shmname = m_conf->getconf("distributor", "nodename");
106  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
107  string(m_conf->getconf("distributor", "nodename"));
108 
109  // 2. Run sender
110  m_nnodes = 0;
111  int maxnodes = m_conf->getconfi("processor", "nnodes");
112  int idbase = m_conf->getconfi("processor", "idbase");
113  char* hostbase = m_conf->getconf("processor", "hostbase");
114  char* badlist = m_conf->getconf("processor", "badlist");
115 
116  char* sender = m_conf->getconf("distributor", "sender", "script");
117  int portbase = m_conf->getconfi("distributor", "sender", "portbase");
118 
119  char hostname[512], idname[3], shmid[3];
120  for (int i = 0; i < maxnodes; i++) {
121  sprintf(idname, "%2.2d", idbase + i);
122  sprintf(shmid, "%2.2d", i);
123  if (badlist == NULL ||
124  strstr(badlist, idname) == 0) {
125  int port = (idbase + i) + portbase;
126  char portchar[256];
127  sprintf(portchar, "%d", port);
128  m_pid_sender[m_nnodes] = m_proc->Execute(sender, (char*)ringbuf.c_str(), portchar, (char*)shmname.c_str(), shmid);
129  m_flow->clear(i);
130  m_nnodes++;
131  }
132  }
133 
134  // 1. Run receiver
135  char* src = m_conf->getconf("distributor", "source");
136  if (strstr(src, "net") != 0) {
137  // Run receiver
138  char* receiver = m_conf->getconf("distributor", "receiver", "script");
139  char* src = m_conf->getconf("distributor", "receiver", "host");
140  char* port = m_conf->getconf("distributor", "receiver", "port");
141  // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
142  char idbuf[3];
143  sprintf(idbuf, "%2.2d", RF_INPUT_ID);
144  m_pid_recv = m_proc->Execute(receiver, (char*)ringbuf.c_str(), src, port, (char*)shmname.c_str(), idbuf);
145  m_flow->clear(RF_INPUT_ID);
146  } else if (strstr(src, "file") != 0) {
147  // Run file reader
148  char* filein = m_conf->getconf("distributor", "fileinput", "script");
149  char* file = m_conf->getconf("distributor", "fileinput", "filename");
150  // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
151  char* nnodechr = m_conf->getconf("distributor", "nnodes");
152  m_pid_recv = m_proc->Execute(filein, (char*)ringbuf.c_str(), file, nnodechr);
153  }
154 
155  m_rbufin->forceClear();
156 
157  // else none
158  return 0;
159 }
160 
161 int RFEventServer::UnConfigure(NSMmsg*, NSMcontext*)
162 {
163  // system("killall sock2rbr rb2sockr");
164  int status;
165  if (m_pid_recv != 0) {
166  kill(m_pid_recv, SIGINT);
167  waitpid(m_pid_recv, &status, 0);
168  }
169  for (int i = 0; i < m_nnodes; i++) {
170  if (m_pid_sender[i] != 0) {
171  printf("RFEventServer:: killing sender pid=%d\n", m_pid_sender[i]);
172  // kill(m_pid_sender[i], SIGINT);
173  kill(m_pid_sender[i], SIGINT);
174  waitpid(m_pid_sender[i], &status, 0);
175  }
176  }
177  // Clear RingBuffer
178  m_rbufin->forceClear();
179 
180  // Clear process list
181  m_flow->fillProcessStatus(GetNodeInfo());
182 
183  printf("Unconfigure : done\n");
184  return 0;
185 }
186 
187 int RFEventServer::Start(NSMmsg*, NSMcontext*)
188 {
189  // m_rbufin->clear();
190  // m_rbufin->forceClear();
191  return 0;
192 }
193 
194 int RFEventServer::Stop(NSMmsg*, NSMcontext*)
195 {
196  m_rbufin->clear();
197  return 0;
198 }
199 
200 
201 int RFEventServer::Restart(NSMmsg*, NSMcontext*)
202 {
203  printf("RFEventServer : Restarting!!!!!\n");
204  /* Original impl.
205  if (m_pid_recv != 0) {
206  kill(m_pid_recv, SIGINT);
207  }
208  for (int i = 0; i < m_nnodes; i++) {
209  if (m_pid_sender[i] != 0) {
210  printf("RFEventServer:: killing sender pid=%d\n", m_pid_sender[i]);
211  kill(m_pid_sender[i], SIGINT);
212  }
213  }
214  // Simple implementation
215  system("killall sock2rbr rb2sockr");
216  fflush(stdout);
217  */
218  NSMmsg* nsmmsg = NULL;
219  NSMcontext* nsmcontext = NULL;
220  RFEventServer::UnConfigure(nsmmsg, nsmcontext);
221  sleep(2);
222  RFEventServer::Configure(nsmmsg, nsmcontext);
223  return 0;
224 }
225 
226 // Server function
227 
228 void RFEventServer::server()
229 {
230  // int nevt = 0;
231  m_flow->fillProcessStatus(GetNodeInfo());
232 
233  while (true) {
234  int sender_id = 0;
235  pid_t pid = m_proc->CheckProcess();
236  if (pid > 0) {
237  printf("RFEventServer : process dead. pid = %d\n", pid);
238  if (pid == m_pid_recv) {
239  m_log->Fatal("RFEventServer : receiver process dead. pid=%d\n", pid);
240  m_pid_recv = 0;
241  } else {
242  for (int i = 0; i < m_nnodes; i++) {
243  if (pid == m_pid_sender[i]) {
244  m_log->Fatal("RFEventServer : sender process (%d) dead. pid=%d\n", i, m_pid_sender[i]);
245  m_pid_sender[i] = 0;
246  sender_id = i;
247  }
248  }
249  }
250  }
251  int st = m_proc->CheckOutput();
252  if (st < 0) {
253  perror("RFEventServer::server");
254  // exit ( -1 );
255  } else if (st > 0) {
256  m_log->ProcessLog(m_proc->GetFd());
257  }
258  m_flow->fillNodeInfo(RF_INPUT_ID, GetNodeInfo(), false);
259  m_flow->fillProcessStatus(GetNodeInfo(), m_pid_recv, m_pid_sender[sender_id]);
260  // m_flow->fillNodeInfo(0, GetNodeInfo(), false);
261  // Debug
262  // RfNodeInfo* info = GetNodeInfo();
263  //info->nevent_in = nevt++;
264  // info->nqueue_in = nevt;
265  // printf ( "FillNodeInfo called!! info->nevent_in = %d\n", info->nevent_in );
266  }
267 }
268 
269 void RFEventServer::cleanup()
270 {
271  printf("RFEventServer : cleaning up\n");
272  UnConfigure(NULL, NULL);
273  /*
274  kill ( m_pid_recv, SIGINT );
275  int status;
276  waitpid ( m_pid_recv, &status, 0 );
277  printf ( "RFEventServer : receiver terminated.\n" );
278  for ( int i=0; i<m_nnodes; i++ ) {
279  kill ( m_pid_sender[i], SIGINT );
280  waitpid ( m_pid_sender[i], &status, 0 );
281  printf ( "RFEventServer : sender [%d] terminated.\n", i );
282  }
283  */
284  printf("RFEventServer: Done. Exitting\n");
285  exit(-1);
286 }
287 
Belle2::RFEventServer
Definition: RFEventServer.h:30
NSMmsg
Definition: nsm2.h:217
Belle2::RFFlowStat
Definition: RFFlowStat.h:28
Belle2::RFSharedMem
Definition: RFSharedMem.h:51
Belle2::RFLogManager
Definition: RFLogManager.h:18
Belle2::RingBuffer
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:36
Belle2::RFProcessManager
Definition: RFProcessManager.h:22
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::RFConf
Definition: RFConf.h:24
NSMcontext_struct
Definition: nsmlib2.h:66