Belle II Software  release-08-01-10
ERecoDistributor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include "daq/expreco/ERecoDistributor.h"
10 
11 #include <sys/stat.h>
12 #include <sys/types.h>
13 #include <sys/wait.h>
14 #include <unistd.h>
15 
16 #include <csignal>
17 #include <cstring>
18 #include <iostream>
19 
20 #define RFEVSOUT stdout
21 
22 using namespace std;
23 using namespace Belle2;
24 
25 ERecoDistributor::ERecoDistributor(string conffile)
26 {
27  // 0. Initialize configuration manager
28  m_conf = new RFConf(conffile.c_str());
29  char* nodename = m_conf->getconf("distributor", "nodename");
30  // char nodename[256];
31  // gethostname ( nodename, sizeof(nodename) );
32 
33  // 1. Set execution directory
34  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/distributor";
35 
36  mkdir(execdir.c_str(), 0755);
37  chdir(execdir.c_str());
38 
39  // 2. Initialize local shared memory
40  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
41  string(m_conf->getconf("distributor", "nodename"));
42  m_shm = new RFSharedMem((char*)shmname.c_str());
43 
44  // 3. Initialize process manager
45  m_proc = new RFProcessManager(nodename);
46 
47  // 4. Initialize RingBuffers
48  // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
49  string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
50  string(m_conf->getconf("distributor", "ringbuffer"));
51  int rbufsize = m_conf->getconfi("distributor", "ringbuffersize");
52  m_rbufin = new RingBuffer(ringbuf.c_str(), rbufsize);
53 
54  // 5. Initialize LogManager
55  m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
56 
57  // 6. Initialize data flow monitor
58  m_flow = new RFFlowStat((char*)shmname.c_str());
59 
60 }
61 
62 ERecoDistributor::~ERecoDistributor()
63 {
64  delete m_log;
65  delete m_proc;
66  delete m_shm;
67  delete m_conf;
68  delete m_flow;
69  delete m_rbufin;
70 }
71 
72 
73 // Functions hooked up by NSM2
74 
75 int ERecoDistributor::Configure(NSMmsg*, NSMcontext*)
76 {
77  // 0. Global parameters
78  // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
79  string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
80  string(m_conf->getconf("distributor", "ringbuffer"));
81 
82  // char* shmname = m_conf->getconf("distributor", "nodename");
83  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
84  string(m_conf->getconf("distributor", "nodename"));
85 
86  // 1. Run sender
87  m_nnodes = 0;
88  int maxnodes = m_conf->getconfi("processor", "nnodes");
89  int idbase = m_conf->getconfi("processor", "idbase");
90  //char* hostbase = m_conf->getconf("processor", "hostbase");
91  char* badlist = m_conf->getconf("processor", "badlist");
92 
93  char* sender = m_conf->getconf("distributor", "sender", "script");
94  int portbase = m_conf->getconfi("distributor", "sender", "portbase");
95 
96 
97  char hostname[512], idname[3], shmid[3];
98  for (int i = 0; i < maxnodes; i++) {
99  sprintf(idname, "%2.2d", idbase + i);
100  sprintf(shmid, "%2.2d", i);
101  if (badlist == NULL ||
102  strstr(badlist, idname) == 0) {
103  int port = (idbase + i) + portbase;
104  char portchar[256];
105  sprintf(portchar, "%d", port);
106  m_pid_sender[m_nnodes] = m_proc->Execute(sender, (char*)ringbuf.c_str(), portchar, (char*)shmname.c_str(), shmid);
107  printf("Running sender to %d\n", port);
108  fflush(stdout);
109  m_nnodes++;
110  }
111  }
112 
113  // 2. Run receiver
114  m_nrecv = 0;
115  int maxrecv = m_conf->getconfi("distributor", "receiver", "nnodes");
116  int ridbase = m_conf->getconfi("distributor", "receiver", "idbase");
117  char* rhostbase = m_conf->getconf("distributor", "receiver", "hostbase");
118  char* rbadlist = m_conf->getconf("distributor", "receiver", "badlist");
119  char* port = m_conf->getconf("distributor", "receiver", "port");
120  char* receiver = m_conf->getconf("distributor", "receiver", "script");
121 
122  // char hostname[512], idname[3], shmid[3];
123  for (int i = 0; i < maxrecv; i++) {
124  sprintf(hostname, "%s%2.2d", rhostbase, ridbase + i);
125  sprintf(idname, "%2.2d", ridbase + i);
126  sprintf(shmid, "%2.2d", i + 20);
127  if (rbadlist == NULL ||
128  strstr(rbadlist, hostname) == 0) {
129  printf("Running receiver for %s\n", hostname);
130  fflush(stdout);
131  m_pid_recv[m_nrecv] = m_proc->Execute(receiver, (char*)ringbuf.c_str(), hostname, port, (char*)shmname.c_str(), shmid);
132  m_nrecv++;
133  }
134  }
135 
136  printf("ERecoDistributor : Configure done\n");
137  return 0;
138 }
139 
140 int ERecoDistributor::UnConfigure(NSMmsg*, NSMcontext*)
141 {
142  // system("killall sock2rbr rb2sockr");
143  int status;
144 
145  printf("m_nrecv = %d\n", m_nrecv);
146  for (int i = 0; i < m_nrecv; i++) {
147  if (m_pid_recv[i] != 0) {
148  printf("ERecoDistributor : killing receiver pid=%d\n", m_pid_recv[i]);
149  kill(m_pid_recv[i], SIGKILL);
150  waitpid(m_pid_recv[i], &status, 0);
151  }
152  }
153 
154  printf("m_nnodes = %d\n", m_nnodes);
155  for (int i = 0; i < m_nnodes; i++) {
156  if (m_pid_sender[i] != 0) {
157  printf("ERecoDistributor : killing sender pid=%d\n", m_pid_sender[i]);
158  // kill(m_pid_sender[i], SIGINT);
159  kill(m_pid_sender[i], SIGKILL);
160  waitpid(m_pid_sender[i], &status, 0);
161  }
162  }
163  // Clear RingBuffer
164  m_rbufin->forceClear();
165 
166  // Clear process list
167  m_flow->fillProcessStatus(GetNodeInfo());
168 
169  printf("ERecoDistributor : Unconfigure done\n");
170  return 0;
171 }
172 
173 int ERecoDistributor::Start(NSMmsg*, NSMcontext*)
174 {
175  m_rbufin->clear();
176  return 0;
177 }
178 
179 int ERecoDistributor::Stop(NSMmsg*, NSMcontext*)
180 {
181  // m_rbufin->clear();
182  return 0;
183 }
184 
185 
186 int ERecoDistributor::Restart(NSMmsg*, NSMcontext*)
187 {
188  printf("ERecoDistributor : Restarting!!!!!\n");
189  NSMmsg* nsmmsg = NULL;
190  NSMcontext* nsmcontext = NULL;
191  ERecoDistributor::UnConfigure(nsmmsg, nsmcontext);
192  sleep(2);
193  ERecoDistributor::Configure(nsmmsg, nsmcontext);
194  return 0;
195 }
196 
197 // Server function
198 
199 void ERecoDistributor::server()
200 {
201  // int nevt = 0;
202  m_flow->fillProcessStatus(GetNodeInfo());
203 
204  while (true) {
205  pid_t pid = m_proc->CheckProcess();
206  if (pid > 0) {
207  printf("ERecoDistributor : process dead. pid = %d\n", pid);
208  for (int i = 0; i < m_nrecv; i++) {
209  if (pid == m_pid_recv[i]) {
210  m_log->Fatal("ERecoDistributor : receiver process (%d) dead. pid=%d\n", i, pid);
211  m_pid_recv[i] = 0;
212  }
213  }
214  for (int i = 0; i < m_nnodes; i++) {
215  if (pid == m_pid_sender[i]) {
216  m_log->Fatal("ERecoDistributor : sender process (%d) dead. pid=%d\n", i, m_pid_sender[i]);
217  m_pid_sender[i] = 0;
218  }
219  }
220  }
221 
222  int st = m_proc->CheckOutput();
223  if (st < 0) {
224  perror("ERecoDistributor::server");
225  // exit ( -1 );
226  } else if (st > 0) {
227  m_log->ProcessLog(m_proc->GetFd());
228  }
229  }
230  // m_flow->fillNodeInfo(RF_INPUT_ID, GetNodeInfo(), false);
231  // m_flow->fillProcessStatus(GetNodeInfo(), m_pid_recv, m_pid_sender[sender_id]);
232  // m_flow->fillNodeInfo(0, GetNodeInfo(), false);
233  // Debug
234  // RfNodeInfo* info = GetNodeInfo();
235  //info->nevent_in = nevt++;
236  // info->nqueue_in = nevt;
237  // printf ( "FillNodeInfo called!! info->nevent_in = %d\n", info->nevent_in );
238 }
239 
240 
241 
242 
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
Abstract base class for different kinds of events.
Definition: nsm2.h:224