Belle II Software development
RFMaster.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include "daq/rfarm/manager/RFMaster.h"
10#include "daq/rfarm/manager/RFNSM.h"
11
12#include <nsm2/belle2nsm.h>
13
14#include <sys/stat.h>
15#include <sys/types.h>
16#include <unistd.h>
17
18#include <cstring>
19
20using namespace std;
21using namespace Belle2;
22
23//#define DESY
24
25// Main
26RFMaster::RFMaster(string conffile)
27{
28 // 0. Initialize configuration manager
29 m_conf = new RFConf(conffile.c_str());
30 char* nodename = m_conf->getconf("master", "nodename");
31 // char nodename[256];
32 // gethostname ( nodename, sizeof(nodename) );
33
34 // 1. Initialize local shared memory
35 // m_shm = new RFSharedMem(nodename);
36
37 // 2. Set execution directory
38 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/master";
39
40 mkdir(execdir.c_str(), 0755);
41 chdir(execdir.c_str());
42
43 // 3. Initialize LogManager
44 m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
45
46 // 4. Leave PID file
47 FILE* f = fopen("pid.data", "w");
48 fprintf(f, "%d", getpid());
49 fclose(f);
50
51}
52
53RFMaster::~RFMaster()
54{
55 delete m_log;
56 // delete m_shm;
57 delete m_conf;
58}
59
60void RFMaster::Hook_Message_Handlers()
61{
62 // 5. Hook message handlers
63 if (b2nsm_callback("LOG", Log_Handler) < 0) {
64 fprintf(stderr, "RFMaster : hooking INFO handler failed, %s\n",
65 b2nsm_strerror());
66 }
67 printf("RFMaster: Message Handlers - Ready\n");
68
69}
70
71// NSM callback functions for message
72
73void RFMaster::Log_Handler(NSMmsg* /*msg*/, NSMcontext* /*ctx*/)
74{
75 // printf ( "RFMaster : [INFO] received\n" );
76 // b2nsm_ok ( msg, "INFO!!", NULL );
77 // fflush ( stdout );
78}
79
80
81// Functions hooked up by NSM2
82
83int RFMaster::Configure(NSMmsg*, NSMcontext*)
84{
85 int* pars = nullptr;
86
87 // 0. Configure DqmServer
88 char* dqmserver = m_conf->getconf("dqmserver", "nodename");
89 RFNSM_Status::Instance().set_flag(0);
90 // b2nsm_sendreq(dqmserver, "RF_CONFIGURE", 0, pars);
91 b2nsm_sendreq(dqmserver, "RC_LOAD", 0, pars);
92 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
93 printf("RFMaster:: dqmserver configured\n");
94 // sleep(2);
95
96 // 1. Configure distributor
97 char* distributor = m_conf->getconf("distributor", "nodename");
98 RFNSM_Status::Instance().set_flag(0);
99 // b2nsm_sendreq(distributor, "RF_CONFIGURE", 0, pars);
100 b2nsm_sendreq(distributor, "RC_LOAD", 0, pars);
101 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
102 printf("RFMaster:: distributor configured\n");
103
104 // sleep(2);
105
106 // 2. Configure event processors
107 int maxnodes = m_conf->getconfi("processor", "nnodes");
108 int idbase = m_conf->getconfi("processor", "idbase");
109 char* hostbase = m_conf->getconf("processor", "nodebase");
110 char* badlist = m_conf->getconf("processor", "badlist");
111
112 char hostnode[512], idname[3];
113 int nnodes = 0;
114 RFNSM_Status::Instance().set_flag(0);
115 for (int i = 0; i < maxnodes; i++) {
116 sprintf(idname, "%2.2d", idbase + i);
117 if (badlist == NULL ||
118 strstr(badlist, idname) == 0) {
119 sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
120 // b2nsm_sendreq(hostnode, "RF_CONFIGURE", 0, pars);
121 b2nsm_sendreq(hostnode, "RC_LOAD", 0, pars);
122 nnodes++;
123 }
124 }
125 // while (RFNSM_Status::Instance().get_flag() != nnodes) b2nsm_wait(1);
126 sleep(10);
127
128 printf("RFMaster:: distributor configured\n");
129
130 // 3. Configure collector
131 char* collector = m_conf->getconf("collector", "nodename");
132 RFNSM_Status::Instance().set_flag(0);
133 // b2nsm_sendreq(collector, "RF_CONFIGURE", 0, pars);
134 b2nsm_sendreq(collector, "RC_LOAD", 0, pars);
135 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
136 printf("RFMaster:: collector configured\n");
137 // sleep ( 5 );
138
139 // 4. Configure
140 char* roisender = m_conf->getconf("roisender", "nodename");
141 RFNSM_Status::Instance().set_flag(0);
142 // b2nsm_sendreq(roisender, "RF_CONFIGURE", 0, pars);
143 b2nsm_sendreq(roisender, "RC_LOAD", 0, pars);
144 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
145 printf("RFMaster:: roisender configured\n");
146
147 return 0;
148}
149
150int RFMaster::UnConfigure(NSMmsg* msgm, NSMcontext* /*msgc*/)
151{
152 int pars[10];
153 pars[0] = msgm->pars[0];
154 pars[1] = msgm->pars[1];
155
156 // Unconfigure RoiSender
157 char* roisender = m_conf->getconf("roisender", "nodename");
158 RFNSM_Status::Instance().set_flag(0);
159 // b2nsm_sendreq(roisender, "RF_UNCONFIGURE", 0, pars);
160 b2nsm_sendreq(roisender, "RC_ABORT", 0, pars);
161 while (RFNSM_Status::Instance().get_flag() == 1) b2nsm_wait(1);
162
163 // Unconfigure collector
164 char* collector = m_conf->getconf("collector", "nodename");
165 RFNSM_Status::Instance().set_flag(0);
166 // b2nsm_sendreq(collector, "RF_UNCONFIGURE", 0, pars);
167 b2nsm_sendreq(collector, "RC_ABORT", 0, pars);
168 while (RFNSM_Status::Instance().get_flag() == 1) b2nsm_wait(1);
169
170 // Unconfigure event processors
171 int maxnodes = m_conf->getconfi("processor", "nnodes");
172 int idbase = m_conf->getconfi("processor", "idbase");
173 char* hostbase = m_conf->getconf("processor", "nodebase");
174 char* badlist = m_conf->getconf("processor", "badlist");
175
176 char hostnode[512], idname[3];
177 RFNSM_Status::Instance().set_flag(0);
178 int nnodes = 0;
179 for (int i = 0; i < maxnodes; i++) {
180 sprintf(idname, "%2.2d", idbase + i);
181 if (badlist == NULL ||
182 strstr(badlist, idname) == 0) {
183 sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
184 // b2nsm_sendreq(hostnode, "RF_UNCONFIGURE", 0, pars);
185 b2nsm_sendreq(hostnode, "RC_ABORT", 0, pars);
186 nnodes++;
187 }
188 }
189#ifdef DESY
190 b2nsm_wait(5);
191#else
192 // printf ( "RFMaster : Unconfigure : started. - nnodes=%d, flag=%d\n",
193 // nnodes, RFNSM_Status::Instance().get_flag() );
194 while (RFNSM_Status::Instance().get_flag() != nnodes) {
195 b2nsm_wait(1);
196 // printf ( "RFMaster : Unconfigure : in prog. - nnodes=%d, flag=%d\n",
197 // nnodes, RFNSM_Status::Instance().get_flag() );
198 }
199 // printf ( "RFMaster : Unconfigure - done: nnodes=%d, flag=%d\n",
200 // nnodes, RFNSM_Status::Instance().get_flag() );
201#endif
202
203 // Unconfigure DqmServer
204 char* dqmserver = m_conf->getconf("dqmserver", "nodename");
205 RFNSM_Status::Instance().set_flag(0);
206 // b2nsm_sendreq(dqmserver, "RF_UNCONFIGURE", 0, pars);
207 b2nsm_sendreq(dqmserver, "RC_ABORT", 0, pars);
208 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
209
210 // Unconfigure distributor
211 char* distributor = m_conf->getconf("distributor", "nodename");
212 RFNSM_Status::Instance().set_flag(0);
213 // b2nsm_sendreq(distributor, "RF_UNCONFIGURE", 0, pars);
214 b2nsm_sendreq(distributor, "RC_ABORT", 0, pars);
215 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
216
217 printf("RFMaster : Unconfigure done.\n");
218 fflush(stdout);
219
220 return 0;
221
222}
223
224int RFMaster::Start(NSMmsg* msgm, NSMcontext* /*msgc*/)
225{
226 int pars[10];
227 pars[0] = msgm->pars[0];
228 pars[1] = msgm->pars[1];
229 printf("RFMaster : Start exp=%d, run=%d\n", pars[0], pars[1]);
230 fflush(stdout);
231
232 // 0. Start DQMserver
233 char* dqmserver = m_conf->getconf("dqmserver", "nodename");
234 RFNSM_Status::Instance().set_flag(0);
235 // b2nsm_sendreq(dqmserver, "RF_STOP", 0, pars);
236 b2nsm_sendreq(dqmserver, "RC_START", 2, pars);
237 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
238
239 // 1. Start worker nodes
240 int maxnodes = m_conf->getconfi("processor", "nnodes");
241 int idbase = m_conf->getconfi("processor", "idbase");
242 char* hostbase = m_conf->getconf("processor", "nodebase");
243 char* badlist = m_conf->getconf("processor", "badlist");
244
245 char hostnode[512], idname[3];
246 RFNSM_Status::Instance().set_flag(0);
247 int nnodes = 0;
248 for (int i = 0; i < maxnodes; i++) {
249 sprintf(idname, "%2.2d", idbase + i);
250 if (badlist == NULL ||
251 strstr(badlist, idname) == 0) {
252 sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
253 // b2nsm_sendreq(hostnode, "RF_STOP", 0, pars);
254 b2nsm_sendreq(hostnode, "RC_START", 2, pars);
255 nnodes++;
256 }
257 }
258
259 return 0;
260}
261
262int RFMaster::Stop(NSMmsg* msg, NSMcontext*)
263{
264 int pars[10];
265 pars[0] = msg->pars[0];
266 pars[1] = msg->pars[1];
267
268 // 1. Stop worker nodes
269 // Unconfigure event processors
270 int maxnodes = m_conf->getconfi("processor", "nnodes");
271 int idbase = m_conf->getconfi("processor", "idbase");
272 char* hostbase = m_conf->getconf("processor", "nodebase");
273 char* badlist = m_conf->getconf("processor", "badlist");
274
275 char hostnode[512], idname[3];
276 RFNSM_Status::Instance().set_flag(0);
277 int nnodes = 0;
278 for (int i = 0; i < maxnodes; i++) {
279 sprintf(idname, "%2.2d", idbase + i);
280 if (badlist == NULL ||
281 strstr(badlist, idname) == 0) {
282 sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
283 // b2nsm_sendreq(hostnode, "RF_STOP", 0, pars);
284 b2nsm_sendreq(hostnode, "RC_STOP", 0, pars);
285 nnodes++;
286 }
287 }
288#ifdef DESY
289 b2nsm_wait(5);
290#else
291 while (RFNSM_Status::Instance().get_flag() != nnodes) b2nsm_wait(1);
292#endif
293
294 // 2. Stop DqmServer node
295 // Unconfigure DqmServer
296 char* dqmserver = m_conf->getconf("dqmserver", "nodename");
297 RFNSM_Status::Instance().set_flag(0);
298 // b2nsm_sendreq(dqmserver, "RF_STOP", 0, pars);
299 b2nsm_sendreq(dqmserver, "RC_STOP", 0, pars);
300 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
301
302 printf("RFMaster : Stopped. exp=%d, run=%d\n", pars[0], pars[1]);
303 fflush(stdout);
304
305 return 0;
306}
307
308
309int RFMaster::Restart(NSMmsg*, NSMcontext*)
310{
311 int* pars = nullptr;
312
313 // 0. Configure DqmServer
314 char* dqmserver = m_conf->getconf("dqmserver", "nodename");
315 // b2nsm_sendreq(dqmserver, "RF_RESTART", 0, pars);
316 b2nsm_sendreq(dqmserver, "RC_RECOVER", 0, pars);
317 sleep(2);
318
319 // 1. Configure distributor
320 char* distributor = m_conf->getconf("distributor", "nodename");
321 // b2nsm_sendreq(distributor, "RF_RESTART", 0, pars);
322 b2nsm_sendreq(distributor, "RC_RECOVER", 0, pars);
323 sleep(2);
324
325 // 2. Configure event processors
326 int maxnodes = m_conf->getconfi("processor", "nnodes");
327 int idbase = m_conf->getconfi("processor", "idbase");
328 char* hostbase = m_conf->getconf("processor", "hostbase");
329 char* badlist = m_conf->getconf("processor", "badlist");
330
331 char hostnode[512], idname[3];
332 for (int i = 0; i < maxnodes; i++) {
333 sprintf(idname, "%2.2d", idbase + i);
334 if (badlist == NULL ||
335 strstr(badlist, idname) == 0) {
336 sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
337 // b2nsm_sendreq(hostnode, "RF_RESTART", 0, pars);
338 b2nsm_sendreq(hostnode, "RC_RECOVER", 0, pars);
339 }
340 }
341 sleep(2);
342
343 // 3. Configure collector
344 char* collector = m_conf->getconf("collector", "nodename");
345 // b2nsm_sendreq(collector, "RF_RESTART", 0, pars);
346 b2nsm_sendreq(collector, "RC_RECOVER", 0, pars);
347
348 return 0;
349}
350
351// Server function
352
353void RFMaster::monitor_loop()
354{
355 while (true) {
356 b2nsm_wait(10);
357 }
358}
359
360
361
Abstract base class for different kinds of events.
STL namespace.
Definition: nsm2.h:224