Belle II Software  release-05-01-25
ERecoDistributor.cc
1 //+
2 // File : ERecoDistributor.cc
3 // Description : Receive events from Storage and distribute them to
4 // processing nodes
5 //
6 // Author : Ryosuke Itoh, IPNS, KEK
7 // Date : 24 - June - 2013
8 //-
9 
10 #include "daq/expreco/ERecoDistributor.h"
11 
12 #include <sys/stat.h>
13 #include <sys/types.h>
14 #include <sys/wait.h>
15 #include <unistd.h>
16 
17 #include <csignal>
18 #include <cstring>
19 #include <iostream>
20 
21 #define RFEVSOUT stdout
22 
23 using namespace std;
24 using namespace Belle2;
25 
26 ERecoDistributor::ERecoDistributor(string conffile)
27 {
28  // 0. Initialize configuration manager
29  m_conf = new RFConf(conffile.c_str());
30  char* nodename = m_conf->getconf("distributor", "nodename");
31  // char nodename[256];
32  // gethostname ( nodename, sizeof(nodename) );
33 
34  // 1. Set execution directory
35  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/distributor";
36 
37  mkdir(execdir.c_str(), 0755);
38  chdir(execdir.c_str());
39 
40  // 2. Initialize local shared memory
41  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
42  string(m_conf->getconf("distributor", "nodename"));
43  m_shm = new RFSharedMem((char*)shmname.c_str());
44 
45  // 3. Initialize process manager
46  m_proc = new RFProcessManager(nodename);
47 
48  // 4. Initialize RingBuffers
49  // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
50  string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
51  string(m_conf->getconf("distributor", "ringbuffer"));
52  int rbufsize = m_conf->getconfi("distributor", "ringbuffersize");
53  m_rbufin = new RingBuffer(ringbuf.c_str(), rbufsize);
54 
55  // 5. Initialize LogManager
56  m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
57 
58  // 6. Initialize data flow monitor
59  m_flow = new RFFlowStat((char*)shmname.c_str());
60 
61 }
62 
63 ERecoDistributor::~ERecoDistributor()
64 {
65  delete m_log;
66  delete m_proc;
67  delete m_shm;
68  delete m_conf;
69  delete m_flow;
70  delete m_rbufin;
71 }
72 
73 
74 // Functions hooked up by NSM2
75 
76 int ERecoDistributor::Configure(NSMmsg*, NSMcontext*)
77 {
78  // 0. Global parameters
79  // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
80  string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
81  string(m_conf->getconf("distributor", "ringbuffer"));
82 
83  // char* shmname = m_conf->getconf("distributor", "nodename");
84  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
85  string(m_conf->getconf("distributor", "nodename"));
86 
87  // 1. Run sender
88  m_nnodes = 0;
89  int maxnodes = m_conf->getconfi("processor", "nnodes");
90  int idbase = m_conf->getconfi("processor", "idbase");
91  char* hostbase = m_conf->getconf("processor", "hostbase");
92  char* badlist = m_conf->getconf("processor", "badlist");
93 
94  char* sender = m_conf->getconf("distributor", "sender", "script");
95  int portbase = m_conf->getconfi("distributor", "sender", "portbase");
96 
97 
98  char hostname[512], idname[3], shmid[3];
99  for (int i = 0; i < maxnodes; i++) {
100  sprintf(idname, "%2.2d", idbase + i);
101  sprintf(shmid, "%2.2d", i);
102  if (badlist == NULL ||
103  strstr(badlist, idname) == 0) {
104  int port = (idbase + i) + portbase;
105  char portchar[256];
106  sprintf(portchar, "%d", port);
107  m_pid_sender[m_nnodes] = m_proc->Execute(sender, (char*)ringbuf.c_str(), portchar, (char*)shmname.c_str(), shmid);
108  printf("Running sender to %d\n", port);
109  fflush(stdout);
110  m_nnodes++;
111  }
112  }
113 
114  // 2. Run receiver
115  m_nrecv = 0;
116  int maxrecv = m_conf->getconfi("distributor", "receiver", "nnodes");
117  int ridbase = m_conf->getconfi("distributor", "receiver", "idbase");
118  char* rhostbase = m_conf->getconf("distributor", "receiver", "hostbase");
119  char* rbadlist = m_conf->getconf("distributor", "receiver", "badlist");
120  char* port = m_conf->getconf("distributor", "receiver", "port");
121  char* receiver = m_conf->getconf("distributor", "receiver", "script");
122 
123  // char hostname[512], idname[3], shmid[3];
124  for (int i = 0; i < maxrecv; i++) {
125  sprintf(hostname, "%s%2.2d", rhostbase, ridbase + i);
126  sprintf(idname, "%2.2d", ridbase + i);
127  sprintf(shmid, "%2.2d", i + 20);
128  if (rbadlist == NULL ||
129  strstr(rbadlist, hostname) == 0) {
130  printf("Running receiver for %s\n", hostname);
131  fflush(stdout);
132  m_pid_recv[m_nrecv] = m_proc->Execute(receiver, (char*)ringbuf.c_str(), hostname, port, (char*)shmname.c_str(), shmid);
133  m_nrecv++;
134  }
135  }
136 
137  printf("ERecoDistributor : Configure done\n");
138  return 0;
139 }
140 
141 int ERecoDistributor::UnConfigure(NSMmsg*, NSMcontext*)
142 {
143  // system("killall sock2rbr rb2sockr");
144  int status;
145 
146  printf("m_nrecv = %d\n", m_nrecv);
147  for (int i = 0; i < m_nrecv; i++) {
148  if (m_pid_recv[i] != 0) {
149  printf("ERecoDistributor : killing receiver pid=%d\n", m_pid_recv[i]);
150  kill(m_pid_recv[i], SIGKILL);
151  waitpid(m_pid_recv[i], &status, 0);
152  }
153  }
154 
155  printf("m_nnodes = %d\n", m_nnodes);
156  for (int i = 0; i < m_nnodes; i++) {
157  if (m_pid_sender[i] != 0) {
158  printf("ERecoDistributor : killing sender pid=%d\n", m_pid_sender[i]);
159  // kill(m_pid_sender[i], SIGINT);
160  kill(m_pid_sender[i], SIGKILL);
161  waitpid(m_pid_sender[i], &status, 0);
162  }
163  }
164  // Clear RingBuffer
165  m_rbufin->forceClear();
166 
167  // Clear process list
168  m_flow->fillProcessStatus(GetNodeInfo());
169 
170  printf("ERecoDistributor : Unconfigure done\n");
171  return 0;
172 }
173 
174 int ERecoDistributor::Start(NSMmsg*, NSMcontext*)
175 {
176  // m_rbufin->clear();
177  return 0;
178 }
179 
180 int ERecoDistributor::Stop(NSMmsg*, NSMcontext*)
181 {
182  // m_rbufin->clear();
183  return 0;
184 }
185 
186 
187 int ERecoDistributor::Restart(NSMmsg*, NSMcontext*)
188 {
189  printf("ERecoDistributor : Restarting!!!!!\n");
190  NSMmsg* nsmmsg = NULL;
191  NSMcontext* nsmcontext = NULL;
192  ERecoDistributor::UnConfigure(nsmmsg, nsmcontext);
193  sleep(2);
194  ERecoDistributor::Configure(nsmmsg, nsmcontext);
195  return 0;
196 }
197 
198 // Server function
199 
200 void ERecoDistributor::server()
201 {
202  // int nevt = 0;
203  m_flow->fillProcessStatus(GetNodeInfo());
204 
205  while (true) {
206  int sender_id = 0; // the only useage is commented below? do we need it?
207  pid_t pid = m_proc->CheckProcess();
208  if (pid > 0) {
209  printf("ERecoDistributor : process dead. pid = %d\n", pid);
210  for (int i = 0; i < m_nrecv; i++) {
211  if (pid == m_pid_recv[i]) {
212  m_log->Fatal("ERecoDistributor : receiver process (%d) dead. pid=%d\n", i, pid);
213  m_pid_recv[i] = 0;
214  }
215  }
216  for (int i = 0; i < m_nnodes; i++) {
217  if (pid == m_pid_sender[i]) {
218  m_log->Fatal("ERecoDistributor : sender process (%d) dead. pid=%d\n", i, m_pid_sender[i]);
219  m_pid_sender[i] = 0;
220  sender_id = i;
221  }
222  }
223  }
224 
225  int st = m_proc->CheckOutput();
226  if (st < 0) {
227  perror("ERecoDistributor::server");
228  // exit ( -1 );
229  } else if (st > 0) {
230  m_log->ProcessLog(m_proc->GetFd());
231  }
232  }
233  // m_flow->fillNodeInfo(RF_INPUT_ID, GetNodeInfo(), false);
234  // m_flow->fillProcessStatus(GetNodeInfo(), m_pid_recv, m_pid_sender[sender_id]);
235  // m_flow->fillNodeInfo(0, GetNodeInfo(), false);
236  // Debug
237  // RfNodeInfo* info = GetNodeInfo();
238  //info->nevent_in = nevt++;
239  // info->nqueue_in = nevt;
240  // printf ( "FillNodeInfo called!! info->nevent_in = %d\n", info->nevent_in );
241 }
242 
243 
244 
245 
NSMmsg
Definition: nsm2.h:217
Belle2::RFFlowStat
Definition: RFFlowStat.h:28
Belle2::RFSharedMem
Definition: RFSharedMem.h:51
Belle2::RFLogManager
Definition: RFLogManager.h:18
Belle2::RingBuffer
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:36
Belle2::RFProcessManager
Definition: RFProcessManager.h:22
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::RFConf
Definition: RFConf.h:24
NSMcontext_struct
Definition: nsmlib2.h:66