Belle II Software  release-08-01-10
RFEventServer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include "daq/rfarm/manager/RFEventServer.h"
10 
11 #include <sys/stat.h>
12 #include <sys/types.h>
13 #include <sys/wait.h>
14 #include <unistd.h>
15 
16 #include <csignal>
17 #include <cstring>
18 #include <iostream>
19 
20 #define RFEVSOUT stdout
21 
22 using namespace std;
23 using namespace Belle2;
24 
25 RFEventServer* RFEventServer::s_instance = 0;
26 //RFServerBase* RFServerBase::s_instance = 0;
27 
28 RFEventServer::RFEventServer(string conffile)
29 {
30  // 0. Initialize configuration manager
31  m_conf = new RFConf(conffile.c_str());
32  char* nodename = m_conf->getconf("distributor", "nodename");
33  // char nodename[256];
34  // gethostname ( nodename, sizeof(nodename) );
35 
36  // 1. Set execution directory
37  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/distributor";
38 
39  mkdir(execdir.c_str(), 0755);
40  chdir(execdir.c_str());
41 
42  // 2. Initialize local shared memory
43  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
44  string(m_conf->getconf("distributor", "nodename"));
45  m_shm = new RFSharedMem((char*)shmname.c_str());
46 
47  // 3. Initialize process manager
48  m_proc = new RFProcessManager(nodename);
49 
50  // 4. Initialize RingBuffers
51  // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
52  string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
53  string(m_conf->getconf("distributor", "ringbuffer"));
54  int rbufsize = m_conf->getconfi("distributor", "ringbuffersize");
55  m_rbufin = new RingBuffer(ringbuf.c_str(), rbufsize);
56 
57  // 5. Initialize LogManager
58  m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
59 
60  // 6. Initialize data flow monitor
61  m_flow = new RFFlowStat((char*)shmname.c_str());
62 
63  // 7. Clear PID list
64  m_pid_recv = 0;
65  for (int i = 0; i < m_nnodes; i++)
66  m_pid_sender[i] = 0 ;
67 
68 }
69 
70 RFEventServer::~RFEventServer()
71 {
72  delete m_log;
73  delete m_proc;
74  delete m_shm;
75  delete m_conf;
76  delete m_flow;
77  delete m_rbufin;
78 }
79 
80 // Access to Singleton
81 RFEventServer& RFEventServer::Create(const string& conffile)
82 {
83  if (!s_instance) {
84  s_instance = new RFEventServer(conffile);
85  }
86  return *s_instance;
87 }
88 
89 RFEventServer& RFEventServer::Instance()
90 {
91  return *s_instance;
92 }
93 
94 
95 // Functions hooked up by NSM2
96 
97 int RFEventServer::Configure(NSMmsg*, NSMcontext*)
98 {
99  // 0. Global parameters
100  // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
101  string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
102  string(m_conf->getconf("distributor", "ringbuffer"));
103 
104  // char* shmname = m_conf->getconf("distributor", "nodename");
105  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
106  string(m_conf->getconf("distributor", "nodename"));
107 
108  // 2. Run sender
109  m_nnodes = 0;
110  int maxnodes = m_conf->getconfi("processor", "nnodes");
111  int idbase = m_conf->getconfi("processor", "idbase");
112  // char* hostbase = m_conf->getconf("processor", "hostbase");
113  char* badlist = m_conf->getconf("processor", "badlist");
114 
115  char* sender = m_conf->getconf("distributor", "sender", "script");
116  int portbase = m_conf->getconfi("distributor", "sender", "portbase");
117 
118  char idname[3], shmid[3];
119  for (int i = 0; i < maxnodes; i++) {
120  sprintf(idname, "%2.2d", idbase + i);
121  sprintf(shmid, "%2.2d", i);
122  if (badlist == NULL ||
123  strstr(badlist, idname) == 0) {
124  int port = (idbase + i) + portbase;
125  char portchar[256];
126  sprintf(portchar, "%d", port);
127  m_pid_sender[m_nnodes] = m_proc->Execute(sender, (char*)ringbuf.c_str(), portchar, (char*)shmname.c_str(), shmid);
128  m_flow->clear(i);
129  m_nnodes++;
130  }
131  }
132 
133  // 1. Run receiver
134  char* srcG = m_conf->getconf("distributor", "source");
135  if (strstr(srcG, "net") != 0) {
136  // Run receiver
137  char* receiver = m_conf->getconf("distributor", "receiver", "script");
138  char* src = m_conf->getconf("distributor", "receiver", "host");
139  char* port = m_conf->getconf("distributor", "receiver", "port");
140  // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
141  char idbuf[3];
142  sprintf(idbuf, "%2.2d", RF_INPUT_ID);
143  m_pid_recv = m_proc->Execute(receiver, (char*)ringbuf.c_str(), src, port, (char*)shmname.c_str(), idbuf);
144  m_flow->clear(RF_INPUT_ID);
145  } else if (strstr(srcG, "file") != 0) {
146  // Run file reader
147  char* filein = m_conf->getconf("distributor", "fileinput", "script");
148  char* file = m_conf->getconf("distributor", "fileinput", "filename");
149  // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
150  char* nnodechr = m_conf->getconf("distributor", "nnodes");
151  m_pid_recv = m_proc->Execute(filein, (char*)ringbuf.c_str(), file, nnodechr);
152  }
153 
154  m_rbufin->forceClear();
155 
156  // else none
157  return 0;
158 }
159 
160 int RFEventServer::UnConfigure(NSMmsg*, NSMcontext*)
161 {
162  // system("killall sock2rbr rb2sockr");
163  int status;
164  if (m_pid_recv != 0) {
165  kill(m_pid_recv, SIGINT);
166  waitpid(m_pid_recv, &status, 0);
167  }
168  for (int i = 0; i < m_nnodes; i++) {
169  if (m_pid_sender[i] != 0) {
170  printf("RFEventServer:: killing sender pid=%d\n", m_pid_sender[i]);
171  // kill(m_pid_sender[i], SIGINT);
172  kill(m_pid_sender[i], SIGINT);
173  waitpid(m_pid_sender[i], &status, 0);
174  }
175  }
176  // Clear RingBuffer
177  m_rbufin->forceClear();
178 
179  // Clear process list
180  m_flow->fillProcessStatus(GetNodeInfo());
181 
182  printf("Unconfigure : done\n");
183  return 0;
184 }
185 
186 int RFEventServer::Start(NSMmsg*, NSMcontext*)
187 {
188  // m_rbufin->clear();
189  // m_rbufin->forceClear();
190  return 0;
191 }
192 
193 int RFEventServer::Stop(NSMmsg*, NSMcontext*)
194 {
195  m_rbufin->clear();
196  return 0;
197 }
198 
199 
200 int RFEventServer::Restart(NSMmsg*, NSMcontext*)
201 {
202  printf("RFEventServer : Restarting!!!!!\n");
203  /* Original impl.
204  if (m_pid_recv != 0) {
205  kill(m_pid_recv, SIGINT);
206  }
207  for (int i = 0; i < m_nnodes; i++) {
208  if (m_pid_sender[i] != 0) {
209  printf("RFEventServer:: killing sender pid=%d\n", m_pid_sender[i]);
210  kill(m_pid_sender[i], SIGINT);
211  }
212  }
213  // Simple implementation
214  system("killall sock2rbr rb2sockr");
215  fflush(stdout);
216  */
217  NSMmsg* nsmmsg = NULL;
218  NSMcontext* nsmcontext = NULL;
219  RFEventServer::UnConfigure(nsmmsg, nsmcontext);
220  sleep(2);
221  RFEventServer::Configure(nsmmsg, nsmcontext);
222  return 0;
223 }
224 
225 // Server function
226 
227 void RFEventServer::server()
228 {
229  // int nevt = 0;
230  m_flow->fillProcessStatus(GetNodeInfo());
231 
232  while (true) {
233  int sender_id = 0;
234  pid_t pid = m_proc->CheckProcess();
235  if (pid > 0) {
236  printf("RFEventServer : process dead. pid = %d\n", pid);
237  if (pid == m_pid_recv) {
238  m_log->Fatal("RFEventServer : receiver process dead. pid=%d\n", pid);
239  m_pid_recv = 0;
240  } else {
241  for (int i = 0; i < m_nnodes; i++) {
242  if (pid == m_pid_sender[i]) {
243  m_log->Fatal("RFEventServer : sender process (%d) dead. pid=%d\n", i, m_pid_sender[i]);
244  m_pid_sender[i] = 0;
245  sender_id = i;
246  }
247  }
248  }
249  }
250  int st = m_proc->CheckOutput();
251  if (st < 0) {
252  perror("RFEventServer::server");
253  // exit ( -1 );
254  } else if (st > 0) {
255  m_log->ProcessLog(m_proc->GetFd());
256  }
257  m_flow->fillNodeInfo(RF_INPUT_ID, GetNodeInfo(), false);
258  m_flow->fillProcessStatus(GetNodeInfo(), m_pid_recv, m_pid_sender[sender_id]);
259  // m_flow->fillNodeInfo(0, GetNodeInfo(), false);
260  // Debug
261  // RfNodeInfo* info = GetNodeInfo();
262  //info->nevent_in = nevt++;
263  // info->nqueue_in = nevt;
264  // printf ( "FillNodeInfo called!! info->nevent_in = %d\n", info->nevent_in );
265  }
266 }
267 
268 void RFEventServer::cleanup()
269 {
270  printf("RFEventServer : cleaning up\n");
271  UnConfigure(NULL, NULL);
272  /*
273  kill ( m_pid_recv, SIGINT );
274  int status;
275  waitpid ( m_pid_recv, &status, 0 );
276  printf ( "RFEventServer : receiver terminated.\n" );
277  for ( int i=0; i<m_nnodes; i++ ) {
278  kill ( m_pid_sender[i], SIGINT );
279  waitpid ( m_pid_sender[i], &status, 0 );
280  printf ( "RFEventServer : sender [%d] terminated.\n", i );
281  }
282  */
283  printf("RFEventServer: Done. Exitting\n");
284  exit(-1);
285 }
286 
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
Abstract base class for different kinds of events.
Definition: nsm2.h:224