Belle II Software  release-08-01-10
RFRoiSender.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/rfarm/manager/RFRoiSender.h>
10 
11 #include <sys/stat.h>
12 #include <sys/types.h>
13 #include <sys/wait.h>
14 
15 #include <unistd.h>
16 
17 #include <csignal>
18 #include <cstring>
19 #include <string>
20 
21 using namespace Belle2;
22 using namespace std;
23 
24 RFRoiSender::RFRoiSender(string conffile)
25 {
26  // 0. Initialize configuration manager
27  m_conf = new RFConf(conffile.c_str());
28  char* nodename = m_conf->getconf("roisender", "nodename");
29  printf("RoiSender : nodename = %s\n", nodename);
30  fflush(stdout);
31  // char nodename[256];
32  // gethostname ( nodename, sizeof(nodename) );
33 
34  // 1. Set execution directory
35  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/roisender";
36 
37  mkdir(execdir.c_str(), 0755);
38  chdir(execdir.c_str());
39 
40  // 2. Initialize local shared memory
41  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
42  string(nodename);
43  m_shm = new RFSharedMem((char*)shmname.c_str());
44  printf("RoiSender : shmname = %s\n", shmname.c_str());
45 
46  // 3. Initialize process manager
47  m_proc = new RFProcessManager(nodename);
48 
49  // 4. Initialize LogManager
50  m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
51 
52  // 5. Initialize data flow monitor
53  m_flow = new RFFlowStat((char*)shmname.c_str());
54 
55  // 6. Clear PID list
56  m_pid_sender = 0;
57  m_pid_merger = 0;
58 
59 }
60 
61 RFRoiSender::~RFRoiSender()
62 {
63  delete m_log;
64  delete m_proc;
65  delete m_conf;
66 }
67 
68 // Functions hooked up by NSM2
69 
70 int RFRoiSender::Configure(NSMmsg* nsmm, NSMcontext* /*nsmc*/)
71 {
72  // 0. Do you need RoI sender?
73  char* roisw = m_conf->getconf("roisender", "enabled");
74  if (nsmm->len > 0) {
75  roisw = (char*) nsmm->datap;
76  printf("Configure: roisender enable flag is overridden : %s\n", roisw);
77  }
78  if (strstr(roisw, "yes") == 0) return 0;
79 
80  // 1. Run merger first
81  char* merger = m_conf->getconf("roisender", "merger");
82  char* mergerport = m_conf->getconf("roisender", "mergerport");
83  char* mergerhost = m_conf->getconf("roisender", "mergerhost");
84  char* onsenhost = m_conf->getconf("roisender", "onsenhost");
85  char* onsenport = m_conf->getconf("roisender", "onsenport");
86 
87  string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
88  string(m_conf->getconf("roisender", "nodename"));
89 
90  // Note: merger should be run on a separate host in Belle2DAQ
91  if (strstr(merger, "none") == 0) {
92  char idbuf[3];
93  sprintf(idbuf, "%2.2d", RF_ROI_ID);
94  m_pid_merger = m_proc->Execute(merger, (char*)shmname.c_str(), idbuf, onsenhost, onsenport, mergerport);
95  sleep(2);
96  }
97 
98  // 2. Run sender
99  char* sender = m_conf->getconf("roisender", "sender");
100  int nqueue = m_conf->getconfi("roisender", "nqueues");
101  // char* qbase = m_conf->getconf("roisender", "qnamebase");
102 
103  char* arglist[20];
104  arglist[0] = mergerhost;
105  arglist[1] = mergerport;
106  int nargs = 2;
107  char roiqs[10][256];
108  for (int i = 0; i < nqueue; i++) {
109  sprintf(roiqs[i], "/roi%d", i);
110  arglist[i + 2] = roiqs[i];
111  nargs++;
112  }
113  // m_pid_sender = m_proc->Execute(sender, mergerhost, mergerport);
114  m_pid_sender = m_proc->Execute(sender, nargs, arglist);
115 
116  return 0;
117 
118 }
119 
120 int RFRoiSender::UnConfigure(NSMmsg*, NSMcontext*)
121 {
122  // system("killall merger_merge hltout2merger");
123  int statx;
124  if (m_pid_sender != 0) {
125  kill(m_pid_sender, SIGINT);
126  waitpid(m_pid_sender, &statx, 0);
127  m_pid_sender = 0;
128  }
129  if (m_pid_merger != 0) {
130  kill(m_pid_merger, SIGINT);
131  waitpid(m_pid_merger, &statx, 0);
132  m_pid_merger = 0;
133  }
134  printf("UnConfigure : done\n");
135  return 0;
136 }
137 
138 int RFRoiSender::Start(NSMmsg*, NSMcontext*)
139 {
140  return 0;
141 }
142 
143 int RFRoiSender::Stop(NSMmsg*, NSMcontext*)
144 {
145  return 0;
146 }
147 
148 
149 int RFRoiSender::Restart(NSMmsg*, NSMcontext*)
150 {
151  printf("RFRoiSender : Restarted!!!!!\n");
152  /* Original Impl
153  if (m_pid_dqm != 0) {
154  kill(m_pid_dqm, SIGINT);
155  }
156  */
157  // system("killall merger_merge hltout2merger");
158  // fflush(stdout);
159  NSMmsg* nsmmsg = NULL;
160  NSMcontext* nsmcontext = NULL;
161  RFRoiSender::UnConfigure(nsmmsg, nsmcontext);
162  sleep(2);
163  RFRoiSender::Configure(nsmmsg, nsmcontext);
164  return 0;
165 }
166 
167 // Server function
168 
169 void RFRoiSender::server()
170 {
171  while (true) {
172  pid_t pid = m_proc->CheckProcess();
173  if (pid > 0) {
174  printf("RFRoiSender : process dead. pid=%d\n", pid);
175  if (pid == m_pid_sender) {
176  m_log->Fatal("RFRoiSender : hltout2merger dead. pid = %d\n", pid);
177  // RFNSM_Status::Instance().set_state(RFSTATE_ERROR); // << will this really set the state? ERROR is not defined anyway
178  } else if (pid == m_pid_merger) {
179  m_log->Fatal("RFRoiSender : merger2merge dead. pid = %d\n", pid);
180  // RFNSM_Status::Instance().set_state(RFSTATE_ERROR); // << will this really set the state? ERROR is not defined anyway
181  }
182  }
183 
184  int st = m_proc->CheckOutput();
185  if (st < 0) {
186  perror("RFRoiSender::server");
187  // exit ( -1 );
188  } else if (st > 0) {
189  m_log->ProcessLog(m_proc->GetFd());
190  }
191  m_flow->fillNodeInfo(RF_ROI_ID, GetNodeInfo(), true);
192  }
193 }
194 
195 void RFRoiSender::cleanup()
196 {
197  printf("RFRoiSender : cleaning up\n");
198  UnConfigure(NULL, NULL);
199  printf("RFRoiSender: Done. Exitting\n");
200  exit(-1);
201 }
Abstract base class for different kinds of events.
Definition: nsm2.h:224