Belle II Software  release-08-01-10
ERecoMaster.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include "daq/expreco/ERecoMaster.h"
10 
11 #include "daq/rfarm/manager/RFNSM.h"
12 
13 #include <nsm2/belle2nsm.h>
14 
15 #include <sys/stat.h>
16 #include <sys/types.h>
17 #include <unistd.h>
18 
19 #include <cstring>
20 
21 using namespace std;
22 using namespace Belle2;
23 
24 //#define DESY
25 
26 
27 // Main
28 ERecoMaster::ERecoMaster(string conffile)
29 {
30  // 0. Initialize configuration manager
31  m_conf = new RFConf(conffile.c_str());
32  char* nodename = m_conf->getconf("master", "nodename");
33  // char nodename[256];
34  // gethostname ( nodename, sizeof(nodename) );
35 
36  // 1. Initialize local shared memory
37  // m_shm = new RFSharedMem(nodename);
38 
39  // 2. Set execution directory
40  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/master";
41 
42  mkdir(execdir.c_str(), 0755);
43  chdir(execdir.c_str());
44 
45  // 3. Initialize LogManager
46  m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
47 
48  // 4. Leave PID file
49  FILE* f = fopen("pid.data", "w");
50  fprintf(f, "%d", getpid());
51  fclose(f);
52 
53 
54 }
55 
56 ERecoMaster::~ERecoMaster()
57 {
58  delete m_log;
59  // delete m_shm;
60  delete m_conf;
61 }
62 
63 void ERecoMaster::Hook_Message_Handlers()
64 {
65  // 5. Hook message handlers
66  if (b2nsm_callback("LOG", Log_Handler) < 0) {
67  fprintf(stderr, "ERecoMaster : hooking INFO handler failed, %s\n",
68  b2nsm_strerror());
69  }
70  printf("ERecoMaster: Message Handlers - Ready\n");
71 
72 }
73 
74 // NSM callback functions for message
75 
76 void ERecoMaster::Log_Handler(NSMmsg* /*msg*/, NSMcontext* /*ctx*/)
77 {
78  // printf ( "ERecoMaster : [INFO] received\n" );
79  // b2nsm_ok ( msg, "INFO!!", NULL );
80  // fflush ( stdout );
81 }
82 
83 
84 // Functions hooked up by NSM2
85 
86 int ERecoMaster::Configure(NSMmsg*, NSMcontext*)
87 {
88  int* pars = nullptr;
89 
90  // 0. Configure DqmServer
91  char* dqmserver = m_conf->getconf("dqmserver", "nodename");
92  RFNSM_Status::Instance().set_flag(0);
93  // b2nsm_sendreq(dqmserver, "RF_CONFIGURE", 0, pars);
94  b2nsm_sendreq(dqmserver, "RC_LOAD", 0, pars);
95  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
96  printf("ERecoMaster:: dqmserver configured\n");
97  sleep(2);
98 
99  // 1. Configure distributor
100  char* distributor = m_conf->getconf("distributor", "nodename");
101  RFNSM_Status::Instance().set_flag(0);
102  // b2nsm_sendreq(distributor, "RF_CONFIGURE", 0, pars);
103  b2nsm_sendreq(distributor, "RC_LOAD", 0, pars);
104  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
105  printf("ERecoMaster:: distributor configured\n");
106 
107  sleep(2);
108 
109  // 2. Configure event processors
110  int maxnodes = m_conf->getconfi("processor", "nnodes");
111  int idbase = m_conf->getconfi("processor", "idbase");
112  char* hostbase = m_conf->getconf("processor", "nodebase");
113  char* badlist = m_conf->getconf("processor", "badlist");
114 
115  char hostnode[512], idname[3];
116  int nnodes = 0;
117  RFNSM_Status::Instance().set_flag(0);
118  for (int i = 0; i < maxnodes; i++) {
119  sprintf(idname, "%2.2d", idbase + i);
120  if (badlist == NULL ||
121  strstr(badlist, idname) == 0) {
122  sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
123  // b2nsm_sendreq(hostnode, "RF_CONFIGURE", 0, pars);
124  b2nsm_sendreq(hostnode, "RC_LOAD", 0, pars);
125  nnodes++;
126  }
127  }
128  while (RFNSM_Status::Instance().get_flag() != nnodes) b2nsm_wait(1);
129 
130  sleep(10);
131 
132  // 3. Configure event sampler
133  char* sampler = m_conf->getconf("eventsampler", "nodename");
134  RFNSM_Status::Instance().set_flag(0);
135  // b2nsm_sendreq(distributor, "RF_CONFIGURE", 0, pars);
136  b2nsm_sendreq(sampler, "RC_LOAD", 0, pars);
137  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
138  printf("ERecoMaster:: sampler configured\n");
139 
140  sleep(2);
141 
142 
143  printf("ERecoMaster:: event processors configured\n");
144 
145  return 0;
146 }
147 
148 int ERecoMaster::UnConfigure(NSMmsg*, NSMcontext*)
149 {
150  int* pars = nullptr;
151 
152  // Unconfigure sampler
153  char* sampler = m_conf->getconf("eventsampler", "nodename");
154  RFNSM_Status::Instance().set_flag(0);
155  // b2nsm_sendreq(distributor, "RF_UNCONFIGURE", 0, pars);
156  b2nsm_sendreq(sampler, "RC_ABORT", 0, pars);
157  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
158  printf("ErecoMaster: sampler unconfigured.\n");
159 
160  // Unconfigure event processors
161  int maxnodes = m_conf->getconfi("processor", "nnodes");
162  int idbase = m_conf->getconfi("processor", "idbase");
163  char* hostbase = m_conf->getconf("processor", "nodebase");
164  char* badlist = m_conf->getconf("processor", "badlist");
165 
166  char hostnode[512], idname[3];
167  RFNSM_Status::Instance().set_flag(0);
168  int nnodes = 0;
169  for (int i = 0; i < maxnodes; i++) {
170  sprintf(idname, "%2.2d", idbase + i);
171  if (badlist == NULL ||
172  strstr(badlist, idname) == 0) {
173  sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
174  // b2nsm_sendreq(hostnode, "RF_UNCONFIGURE", 0, pars);
175  b2nsm_sendreq(hostnode, "RC_ABORT", 0, pars);
176  nnodes++;
177  }
178  }
179 #ifdef DESY
180  b2nsm_wait(5);
181 #else
182  while (RFNSM_Status::Instance().get_flag() != nnodes) b2nsm_wait(1);
183 #endif
184  printf("ERecoMaster: eventprocessors unconfigured.\n");
185 
186  // Unconfigure distributor
187  char* distributor = m_conf->getconf("distributor", "nodename");
188  RFNSM_Status::Instance().set_flag(0);
189  // b2nsm_sendreq(distributor, "RF_UNCONFIGURE", 0, pars);
190  b2nsm_sendreq(distributor, "RC_ABORT", 0, pars);
191  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
192  printf("ErecoMaster: distributor unconfigured.\n");
193 
194  // Unconfigure DqmServer
195  char* dqmserver = m_conf->getconf("dqmserver", "nodename");
196  RFNSM_Status::Instance().set_flag(0);
197  // b2nsm_sendreq(dqmserver, "RF_UNCONFIGURE", 0, pars);
198  b2nsm_sendreq(dqmserver, "RC_ABORT", 0, pars);
199  // while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
200  sleep(5);
201  printf("ErecoMaster: dqmserver unconfigured.\n");
202 
203  return 0;
204 
205 }
206 
207 int ERecoMaster::Start(NSMmsg*, NSMcontext*)
208 {
209  return 0;
210 }
211 
212 int ERecoMaster::Stop(NSMmsg* msg, NSMcontext*)
213 {
214  int pars[10];
215  pars[0] = msg->pars[0];
216  pars[1] = msg->pars[1];
217 
218  // 1. Stop worker nodes
219  // Unconfigure event processors
220  int maxnodes = m_conf->getconfi("processor", "nnodes");
221  int idbase = m_conf->getconfi("processor", "idbase");
222  char* hostbase = m_conf->getconf("processor", "nodebase");
223  char* badlist = m_conf->getconf("processor", "badlist");
224 
225  char hostnode[512], idname[3];
226  RFNSM_Status::Instance().set_flag(0);
227  int nnodes = 0;
228  for (int i = 0; i < maxnodes; i++) {
229  sprintf(idname, "%2.2d", idbase + i);
230  if (badlist == NULL ||
231  strstr(badlist, idname) == 0) {
232  sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
233  // b2nsm_sendreq(hostnode, "RF_STOP", 0, pars);
234  b2nsm_sendreq(hostnode, "RC_STOP", 0, pars);
235  nnodes++;
236  }
237  }
238 #ifdef DESY
239  b2nsm_wait(5);
240 #else
241  while (RFNSM_Status::Instance().get_flag() != nnodes) b2nsm_wait(1);
242 #endif
243 
244  // 2. Stop DqmServer node
245  // Unconfigure DqmServer
246  char* dqmserver = m_conf->getconf("dqmserver", "nodename");
247  RFNSM_Status::Instance().set_flag(0);
248  // b2nsm_sendreq(dqmserver, "RF_STOP", 0, pars);
249  b2nsm_sendreq(dqmserver, "RC_STOP", 0, pars);
250  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
251 
252  return 0;
253 }
254 
255 
256 int ERecoMaster::Restart(NSMmsg*, NSMcontext*)
257 {
258  int* pars = nullptr;
259 
260  // 0. Configure DqmServer
261  char* dqmserver = m_conf->getconf("dqmserver", "nodename");
262  // b2nsm_sendreq(dqmserver, "RF_RESTART", 0, pars);
263  b2nsm_sendreq(dqmserver, "RC_RECOVER", 0, pars);
264  sleep(2);
265 
266  // 1. Configure distributor
267  char* distributor = m_conf->getconf("distributor", "nodename");
268  // b2nsm_sendreq(distributor, "RF_RESTART", 0, pars);
269  b2nsm_sendreq(distributor, "RC_RECOVER", 0, pars);
270  sleep(2);
271 
272  // 2. Configure event processors
273  int maxnodes = m_conf->getconfi("processor", "nnodes");
274  int idbase = m_conf->getconfi("processor", "idbase");
275  char* hostbase = m_conf->getconf("processor", "hostbase");
276  char* badlist = m_conf->getconf("processor", "badlist");
277 
278  char hostnode[512], idname[3];
279  for (int i = 0; i < maxnodes; i++) {
280  sprintf(idname, "%2.2d", idbase + i);
281  if (badlist == NULL ||
282  strstr(badlist, idname) == 0) {
283  sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
284  // b2nsm_sendreq(hostnode, "RF_RESTART", 0, pars);
285  b2nsm_sendreq(hostnode, "RC_RECOVER", 0, pars);
286  }
287  }
288  sleep(2);
289 
290  return 0;
291 }
292 
293 // Server function
294 
295 void ERecoMaster::monitor_loop()
296 {
297  while (true) {
298  b2nsm_wait(10);
299  }
300 }
301 
302 
303 
Abstract base class for different kinds of events.
Definition: nsm2.h:224