Belle II Software  release-06-02-00
RFMaster.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include "daq/rfarm/manager/RFMaster.h"
10 #include "daq/rfarm/manager/RFNSM.h"
11 
12 #include <nsm2/belle2nsm.h>
13 
14 #include <sys/stat.h>
15 #include <sys/types.h>
16 #include <unistd.h>
17 
18 #include <cstring>
19 
20 using namespace std;
21 using namespace Belle2;
22 
23 //#define DESY
24 
25 // Main
26 RFMaster::RFMaster(string conffile)
27 {
28  // 0. Initialize configuration manager
29  m_conf = new RFConf(conffile.c_str());
30  char* nodename = m_conf->getconf("master", "nodename");
31  // char nodename[256];
32  // gethostname ( nodename, sizeof(nodename) );
33 
34  // 1. Initialize local shared memory
35  // m_shm = new RFSharedMem(nodename);
36 
37  // 2. Set execution directory
38  string execdir = string(m_conf->getconf("system", "execdir_base")) + "/master";
39 
40  mkdir(execdir.c_str(), 0755);
41  chdir(execdir.c_str());
42 
43  // 3. Initialize LogManager
44  m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
45 
46  // 4. Leave PID file
47  FILE* f = fopen("pid.data", "w");
48  fprintf(f, "%d", getpid());
49  fclose(f);
50 
51 }
52 
53 RFMaster::~RFMaster()
54 {
55  delete m_log;
56  // delete m_shm;
57  delete m_conf;
58 }
59 
60 void RFMaster::Hook_Message_Handlers()
61 {
62  // 5. Hook message handlers
63  if (b2nsm_callback("LOG", Log_Handler) < 0) {
64  fprintf(stderr, "RFMaster : hooking INFO handler failed, %s\n",
65  b2nsm_strerror());
66  }
67  printf("RFMaster: Message Handlers - Ready\n");
68 
69 }
70 
71 // NSM callback functions for message
72 
73 void RFMaster::Log_Handler(NSMmsg* msg, NSMcontext* ctx)
74 {
75  // printf ( "RFMaster : [INFO] received\n" );
76  // b2nsm_ok ( msg, "INFO!!", NULL );
77  // fflush ( stdout );
78 }
79 
80 
81 // Functions hooked up by NSM2
82 
83 int RFMaster::Configure(NSMmsg*, NSMcontext*)
84 {
85  int* pars;
86 
87  // 0. Configure DqmServer
88  char* dqmserver = m_conf->getconf("dqmserver", "nodename");
89  RFNSM_Status::Instance().set_flag(0);
90  // b2nsm_sendreq(dqmserver, "RF_CONFIGURE", 0, pars);
91  b2nsm_sendreq(dqmserver, "RC_LOAD", 0, pars);
92  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
93  printf("RFMaster:: dqmserver configured\n");
94  // sleep(2);
95 
96  // 1. Configure distributor
97  char* distributor = m_conf->getconf("distributor", "nodename");
98  RFNSM_Status::Instance().set_flag(0);
99  // b2nsm_sendreq(distributor, "RF_CONFIGURE", 0, pars);
100  b2nsm_sendreq(distributor, "RC_LOAD", 0, pars);
101  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
102  printf("RFMaster:: distributor configured\n");
103 
104  // sleep(2);
105 
106  // 2. Configure event processors
107  int maxnodes = m_conf->getconfi("processor", "nnodes");
108  int idbase = m_conf->getconfi("processor", "idbase");
109  char* hostbase = m_conf->getconf("processor", "nodebase");
110  char* badlist = m_conf->getconf("processor", "badlist");
111 
112  char hostnode[512], idname[3];
113  int nnodes = 0;
114  RFNSM_Status::Instance().set_flag(0);
115  for (int i = 0; i < maxnodes; i++) {
116  sprintf(idname, "%2.2d", idbase + i);
117  if (badlist == NULL ||
118  strstr(badlist, idname) == 0) {
119  sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
120  // b2nsm_sendreq(hostnode, "RF_CONFIGURE", 0, pars);
121  b2nsm_sendreq(hostnode, "RC_LOAD", 0, pars);
122  nnodes++;
123  }
124  }
125  // while (RFNSM_Status::Instance().get_flag() != nnodes) b2nsm_wait(1);
126  sleep(10);
127 
128  printf("RFMaster:: distributor configured\n");
129 
130  // 3. Configure collector
131  char* collector = m_conf->getconf("collector", "nodename");
132  RFNSM_Status::Instance().set_flag(0);
133  // b2nsm_sendreq(collector, "RF_CONFIGURE", 0, pars);
134  b2nsm_sendreq(collector, "RC_LOAD", 0, pars);
135  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
136  printf("RFMaster:: collector configured\n");
137  // sleep ( 5 );
138 
139  // 4. Configure
140  char* roisender = m_conf->getconf("roisender", "nodename");
141  RFNSM_Status::Instance().set_flag(0);
142  // b2nsm_sendreq(roisender, "RF_CONFIGURE", 0, pars);
143  b2nsm_sendreq(roisender, "RC_LOAD", 0, pars);
144  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
145  printf("RFMaster:: roisender configured\n");
146 
147  return 0;
148 }
149 
150 int RFMaster::UnConfigure(NSMmsg* msgm, NSMcontext* msgc)
151 {
152  int pars[10];
153  pars[0] = msgm->pars[0];
154  pars[1] = msgm->pars[1];
155 
156  // Unconfigure RoiSender
157  char* roisender = m_conf->getconf("roisender", "nodename");
158  RFNSM_Status::Instance().set_flag(0);
159  // b2nsm_sendreq(roisender, "RF_UNCONFIGURE", 0, pars);
160  b2nsm_sendreq(roisender, "RC_ABORT", 0, pars);
161  while (RFNSM_Status::Instance().get_flag() == 1) b2nsm_wait(1);
162 
163  // Unconfigure collector
164  char* collector = m_conf->getconf("collector", "nodename");
165  RFNSM_Status::Instance().set_flag(0);
166  // b2nsm_sendreq(collector, "RF_UNCONFIGURE", 0, pars);
167  b2nsm_sendreq(collector, "RC_ABORT", 0, pars);
168  while (RFNSM_Status::Instance().get_flag() == 1) b2nsm_wait(1);
169 
170  // Unconfigure event processors
171  int maxnodes = m_conf->getconfi("processor", "nnodes");
172  int idbase = m_conf->getconfi("processor", "idbase");
173  char* hostbase = m_conf->getconf("processor", "nodebase");
174  char* badlist = m_conf->getconf("processor", "badlist");
175 
176  char hostnode[512], idname[3];
177  RFNSM_Status::Instance().set_flag(0);
178  int nnodes = 0;
179  for (int i = 0; i < maxnodes; i++) {
180  sprintf(idname, "%2.2d", idbase + i);
181  if (badlist == NULL ||
182  strstr(badlist, idname) == 0) {
183  sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
184  // b2nsm_sendreq(hostnode, "RF_UNCONFIGURE", 0, pars);
185  b2nsm_sendreq(hostnode, "RC_ABORT", 0, pars);
186  nnodes++;
187  }
188  }
189 #ifdef DESY
190  b2nsm_wait(5);
191 #else
192  // printf ( "RFMaster : Unconfigure : started. - nnodes=%d, flag=%d\n",
193  // nnodes, RFNSM_Status::Instance().get_flag() );
194  while (RFNSM_Status::Instance().get_flag() != nnodes) {
195  b2nsm_wait(1);
196  // printf ( "RFMaster : Unconfigure : in prog. - nnodes=%d, flag=%d\n",
197  // nnodes, RFNSM_Status::Instance().get_flag() );
198  }
199  // printf ( "RFMaster : Unconfigure - done: nnodes=%d, flag=%d\n",
200  // nnodes, RFNSM_Status::Instance().get_flag() );
201 #endif
202 
203  // Unconfigure DqmServer
204  char* dqmserver = m_conf->getconf("dqmserver", "nodename");
205  RFNSM_Status::Instance().set_flag(0);
206  // b2nsm_sendreq(dqmserver, "RF_UNCONFIGURE", 0, pars);
207  b2nsm_sendreq(dqmserver, "RC_ABORT", 0, pars);
208  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
209 
210  // Unconfigure distributor
211  char* distributor = m_conf->getconf("distributor", "nodename");
212  RFNSM_Status::Instance().set_flag(0);
213  // b2nsm_sendreq(distributor, "RF_UNCONFIGURE", 0, pars);
214  b2nsm_sendreq(distributor, "RC_ABORT", 0, pars);
215  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
216 
217  printf("RFMaster : Unconfigure done.\n");
218  fflush(stdout);
219 
220  return 0;
221 
222 }
223 
224 int RFMaster::Start(NSMmsg* msgm, NSMcontext* msgc)
225 {
226  int pars[10];
227  pars[0] = msgm->pars[0];
228  pars[1] = msgm->pars[1];
229  printf("RFMaster : Start exp=%d, run=%d\n", pars[0], pars[1]);
230  fflush(stdout);
231 
232  // 0. Start DQMserver
233  char* dqmserver = m_conf->getconf("dqmserver", "nodename");
234  RFNSM_Status::Instance().set_flag(0);
235  // b2nsm_sendreq(dqmserver, "RF_STOP", 0, pars);
236  b2nsm_sendreq(dqmserver, "RC_START", 2, pars);
237  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
238 
239  // 1. Start worker nodes
240  int maxnodes = m_conf->getconfi("processor", "nnodes");
241  int idbase = m_conf->getconfi("processor", "idbase");
242  char* hostbase = m_conf->getconf("processor", "nodebase");
243  char* badlist = m_conf->getconf("processor", "badlist");
244 
245  char hostnode[512], idname[3];
246  RFNSM_Status::Instance().set_flag(0);
247  int nnodes = 0;
248  for (int i = 0; i < maxnodes; i++) {
249  sprintf(idname, "%2.2d", idbase + i);
250  if (badlist == NULL ||
251  strstr(badlist, idname) == 0) {
252  sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
253  // b2nsm_sendreq(hostnode, "RF_STOP", 0, pars);
254  b2nsm_sendreq(hostnode, "RC_START", 2, pars);
255  nnodes++;
256  }
257  }
258 
259  return 0;
260 }
261 
262 int RFMaster::Stop(NSMmsg* msg, NSMcontext*)
263 {
264  int pars[10];
265  pars[0] = msg->pars[0];
266  pars[1] = msg->pars[1];
267 
268  // 1. Stop worker nodes
269  // Unconfigure event processors
270  int maxnodes = m_conf->getconfi("processor", "nnodes");
271  int idbase = m_conf->getconfi("processor", "idbase");
272  char* hostbase = m_conf->getconf("processor", "nodebase");
273  char* badlist = m_conf->getconf("processor", "badlist");
274 
275  char hostnode[512], idname[3];
276  RFNSM_Status::Instance().set_flag(0);
277  int nnodes = 0;
278  for (int i = 0; i < maxnodes; i++) {
279  sprintf(idname, "%2.2d", idbase + i);
280  if (badlist == NULL ||
281  strstr(badlist, idname) == 0) {
282  sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
283  // b2nsm_sendreq(hostnode, "RF_STOP", 0, pars);
284  b2nsm_sendreq(hostnode, "RC_STOP", 0, pars);
285  nnodes++;
286  }
287  }
288 #ifdef DESY
289  b2nsm_wait(5);
290 #else
291  while (RFNSM_Status::Instance().get_flag() != nnodes) b2nsm_wait(1);
292 #endif
293 
294  // 2. Stop DqmServer node
295  // Unconfigure DqmServer
296  char* dqmserver = m_conf->getconf("dqmserver", "nodename");
297  RFNSM_Status::Instance().set_flag(0);
298  // b2nsm_sendreq(dqmserver, "RF_STOP", 0, pars);
299  b2nsm_sendreq(dqmserver, "RC_STOP", 0, pars);
300  while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
301 
302  printf("RFMaster : Stopped. exp=%d, run=%d\n", pars[0], pars[1]);
303  fflush(stdout);
304 
305  return 0;
306 }
307 
308 
309 int RFMaster::Restart(NSMmsg*, NSMcontext*)
310 {
311  int* pars;
312 
313  // 0. Configure DqmServer
314  char* dqmserver = m_conf->getconf("dqmserver", "nodename");
315  // b2nsm_sendreq(dqmserver, "RF_RESTART", 0, pars);
316  b2nsm_sendreq(dqmserver, "RC_RECOVER", 0, pars);
317  sleep(2);
318 
319  // 1. Configure distributor
320  char* distributor = m_conf->getconf("distributor", "nodename");
321  // b2nsm_sendreq(distributor, "RF_RESTART", 0, pars);
322  b2nsm_sendreq(distributor, "RC_RECOVER", 0, pars);
323  sleep(2);
324 
325  // 2. Configure event processors
326  int maxnodes = m_conf->getconfi("processor", "nnodes");
327  int idbase = m_conf->getconfi("processor", "idbase");
328  char* hostbase = m_conf->getconf("processor", "hostbase");
329  char* badlist = m_conf->getconf("processor", "badlist");
330 
331  char hostnode[512], idname[3];
332  for (int i = 0; i < maxnodes; i++) {
333  sprintf(idname, "%2.2d", idbase + i);
334  if (badlist == NULL ||
335  strstr(badlist, idname) == 0) {
336  sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
337  // b2nsm_sendreq(hostnode, "RF_RESTART", 0, pars);
338  b2nsm_sendreq(hostnode, "RC_RECOVER", 0, pars);
339  }
340  }
341  sleep(2);
342 
343  // 3. Configure collector
344  char* collector = m_conf->getconf("collector", "nodename");
345  // b2nsm_sendreq(collector, "RF_RESTART", 0, pars);
346  b2nsm_sendreq(collector, "RC_RECOVER", 0, pars);
347 
348  return 0;
349 }
350 
351 // Server function
352 
353 void RFMaster::monitor_loop()
354 {
355  while (true) {
356  b2nsm_wait(10);
357  }
358 }
359 
360 
361 
Abstract base class for different kinds of events.
Definition: nsm2.h:224