Belle II Software development
ERecoMaster.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include "daq/expreco/ERecoMaster.h"
10
11#include "daq/rfarm/manager/RFNSM.h"
12
13#include <nsm2/belle2nsm.h>
14
15#include <sys/stat.h>
16#include <sys/types.h>
17#include <unistd.h>
18
19#include <cstring>
20
21using namespace std;
22using namespace Belle2;
23
24//#define DESY
25
26
27// Main
28ERecoMaster::ERecoMaster(string conffile)
29{
30 // 0. Initialize configuration manager
31 m_conf = new RFConf(conffile.c_str());
32 char* nodename = m_conf->getconf("master", "nodename");
33 // char nodename[256];
34 // gethostname ( nodename, sizeof(nodename) );
35
36 // 1. Initialize local shared memory
37 // m_shm = new RFSharedMem(nodename);
38
39 // 2. Set execution directory
40 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/master";
41
42 mkdir(execdir.c_str(), 0755);
43 chdir(execdir.c_str());
44
45 // 3. Initialize LogManager
46 m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
47
48 // 4. Leave PID file
49 FILE* f = fopen("pid.data", "w");
50 fprintf(f, "%d", getpid());
51 fclose(f);
52
53
54}
55
56ERecoMaster::~ERecoMaster()
57{
58 delete m_log;
59 // delete m_shm;
60 delete m_conf;
61}
62
63void ERecoMaster::Hook_Message_Handlers()
64{
65 // 5. Hook message handlers
66 if (b2nsm_callback("LOG", Log_Handler) < 0) {
67 fprintf(stderr, "ERecoMaster : hooking INFO handler failed, %s\n",
68 b2nsm_strerror());
69 }
70 printf("ERecoMaster: Message Handlers - Ready\n");
71
72}
73
74// NSM callback functions for message
75
76void ERecoMaster::Log_Handler(NSMmsg* /*msg*/, NSMcontext* /*ctx*/)
77{
78 // printf ( "ERecoMaster : [INFO] received\n" );
79 // b2nsm_ok ( msg, "INFO!!", NULL );
80 // fflush ( stdout );
81}
82
83
84// Functions hooked up by NSM2
85
86int ERecoMaster::Configure(NSMmsg*, NSMcontext*)
87{
88 int* pars = nullptr;
89
90 // 0. Configure DqmServer
91 char* dqmserver = m_conf->getconf("dqmserver", "nodename");
92 RFNSM_Status::Instance().set_flag(0);
93 // b2nsm_sendreq(dqmserver, "RF_CONFIGURE", 0, pars);
94 b2nsm_sendreq(dqmserver, "RC_LOAD", 0, pars);
95 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
96 printf("ERecoMaster:: dqmserver configured\n");
97 sleep(2);
98
99 // 1. Configure distributor
100 char* distributor = m_conf->getconf("distributor", "nodename");
101 RFNSM_Status::Instance().set_flag(0);
102 // b2nsm_sendreq(distributor, "RF_CONFIGURE", 0, pars);
103 b2nsm_sendreq(distributor, "RC_LOAD", 0, pars);
104 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
105 printf("ERecoMaster:: distributor configured\n");
106
107 sleep(2);
108
109 // 2. Configure event processors
110 int maxnodes = m_conf->getconfi("processor", "nnodes");
111 int idbase = m_conf->getconfi("processor", "idbase");
112 char* hostbase = m_conf->getconf("processor", "nodebase");
113 char* badlist = m_conf->getconf("processor", "badlist");
114
115 char hostnode[512], idname[3];
116 int nnodes = 0;
117 RFNSM_Status::Instance().set_flag(0);
118 for (int i = 0; i < maxnodes; i++) {
119 sprintf(idname, "%2.2d", idbase + i);
120 if (badlist == NULL ||
121 strstr(badlist, idname) == 0) {
122 sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
123 // b2nsm_sendreq(hostnode, "RF_CONFIGURE", 0, pars);
124 b2nsm_sendreq(hostnode, "RC_LOAD", 0, pars);
125 nnodes++;
126 }
127 }
128 while (RFNSM_Status::Instance().get_flag() != nnodes) b2nsm_wait(1);
129
130 sleep(10);
131
132 // 3. Configure event sampler
133 char* sampler = m_conf->getconf("eventsampler", "nodename");
134 RFNSM_Status::Instance().set_flag(0);
135 // b2nsm_sendreq(distributor, "RF_CONFIGURE", 0, pars);
136 b2nsm_sendreq(sampler, "RC_LOAD", 0, pars);
137 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
138 printf("ERecoMaster:: sampler configured\n");
139
140 sleep(2);
141
142
143 printf("ERecoMaster:: event processors configured\n");
144
145 return 0;
146}
147
148int ERecoMaster::UnConfigure(NSMmsg*, NSMcontext*)
149{
150 int* pars = nullptr;
151
152 // Unconfigure sampler
153 char* sampler = m_conf->getconf("eventsampler", "nodename");
154 RFNSM_Status::Instance().set_flag(0);
155 // b2nsm_sendreq(distributor, "RF_UNCONFIGURE", 0, pars);
156 b2nsm_sendreq(sampler, "RC_ABORT", 0, pars);
157 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
158 printf("ErecoMaster: sampler unconfigured.\n");
159
160 // Unconfigure event processors
161 int maxnodes = m_conf->getconfi("processor", "nnodes");
162 int idbase = m_conf->getconfi("processor", "idbase");
163 char* hostbase = m_conf->getconf("processor", "nodebase");
164 char* badlist = m_conf->getconf("processor", "badlist");
165
166 char hostnode[512], idname[3];
167 RFNSM_Status::Instance().set_flag(0);
168 int nnodes = 0;
169 for (int i = 0; i < maxnodes; i++) {
170 sprintf(idname, "%2.2d", idbase + i);
171 if (badlist == NULL ||
172 strstr(badlist, idname) == 0) {
173 sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
174 // b2nsm_sendreq(hostnode, "RF_UNCONFIGURE", 0, pars);
175 b2nsm_sendreq(hostnode, "RC_ABORT", 0, pars);
176 nnodes++;
177 }
178 }
179#ifdef DESY
180 b2nsm_wait(5);
181#else
182 while (RFNSM_Status::Instance().get_flag() != nnodes) b2nsm_wait(1);
183#endif
184 printf("ERecoMaster: eventprocessors unconfigured.\n");
185
186 // Unconfigure distributor
187 char* distributor = m_conf->getconf("distributor", "nodename");
188 RFNSM_Status::Instance().set_flag(0);
189 // b2nsm_sendreq(distributor, "RF_UNCONFIGURE", 0, pars);
190 b2nsm_sendreq(distributor, "RC_ABORT", 0, pars);
191 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
192 printf("ErecoMaster: distributor unconfigured.\n");
193
194 // Unconfigure DqmServer
195 char* dqmserver = m_conf->getconf("dqmserver", "nodename");
196 RFNSM_Status::Instance().set_flag(0);
197 // b2nsm_sendreq(dqmserver, "RF_UNCONFIGURE", 0, pars);
198 b2nsm_sendreq(dqmserver, "RC_ABORT", 0, pars);
199 // while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
200 sleep(5);
201 printf("ErecoMaster: dqmserver unconfigured.\n");
202
203 return 0;
204
205}
206
207int ERecoMaster::Start(NSMmsg*, NSMcontext*)
208{
209 return 0;
210}
211
212int ERecoMaster::Stop(NSMmsg* msg, NSMcontext*)
213{
214 int pars[10];
215 pars[0] = msg->pars[0];
216 pars[1] = msg->pars[1];
217
218 // 1. Stop worker nodes
219 // Unconfigure event processors
220 int maxnodes = m_conf->getconfi("processor", "nnodes");
221 int idbase = m_conf->getconfi("processor", "idbase");
222 char* hostbase = m_conf->getconf("processor", "nodebase");
223 char* badlist = m_conf->getconf("processor", "badlist");
224
225 char hostnode[512], idname[3];
226 RFNSM_Status::Instance().set_flag(0);
227 int nnodes = 0;
228 for (int i = 0; i < maxnodes; i++) {
229 sprintf(idname, "%2.2d", idbase + i);
230 if (badlist == NULL ||
231 strstr(badlist, idname) == 0) {
232 sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
233 // b2nsm_sendreq(hostnode, "RF_STOP", 0, pars);
234 b2nsm_sendreq(hostnode, "RC_STOP", 0, pars);
235 nnodes++;
236 }
237 }
238#ifdef DESY
239 b2nsm_wait(5);
240#else
241 while (RFNSM_Status::Instance().get_flag() != nnodes) b2nsm_wait(1);
242#endif
243
244 // 2. Stop DqmServer node
245 // Unconfigure DqmServer
246 char* dqmserver = m_conf->getconf("dqmserver", "nodename");
247 RFNSM_Status::Instance().set_flag(0);
248 // b2nsm_sendreq(dqmserver, "RF_STOP", 0, pars);
249 b2nsm_sendreq(dqmserver, "RC_STOP", 0, pars);
250 while (RFNSM_Status::Instance().get_flag() == 0) b2nsm_wait(1);
251
252 return 0;
253}
254
255
256int ERecoMaster::Restart(NSMmsg*, NSMcontext*)
257{
258 int* pars = nullptr;
259
260 // 0. Configure DqmServer
261 char* dqmserver = m_conf->getconf("dqmserver", "nodename");
262 // b2nsm_sendreq(dqmserver, "RF_RESTART", 0, pars);
263 b2nsm_sendreq(dqmserver, "RC_RECOVER", 0, pars);
264 sleep(2);
265
266 // 1. Configure distributor
267 char* distributor = m_conf->getconf("distributor", "nodename");
268 // b2nsm_sendreq(distributor, "RF_RESTART", 0, pars);
269 b2nsm_sendreq(distributor, "RC_RECOVER", 0, pars);
270 sleep(2);
271
272 // 2. Configure event processors
273 int maxnodes = m_conf->getconfi("processor", "nnodes");
274 int idbase = m_conf->getconfi("processor", "idbase");
275 char* hostbase = m_conf->getconf("processor", "hostbase");
276 char* badlist = m_conf->getconf("processor", "badlist");
277
278 char hostnode[512], idname[3];
279 for (int i = 0; i < maxnodes; i++) {
280 sprintf(idname, "%2.2d", idbase + i);
281 if (badlist == NULL ||
282 strstr(badlist, idname) == 0) {
283 sprintf(hostnode, "evp_%s%2.2d", hostbase, idbase + i);
284 // b2nsm_sendreq(hostnode, "RF_RESTART", 0, pars);
285 b2nsm_sendreq(hostnode, "RC_RECOVER", 0, pars);
286 }
287 }
288 sleep(2);
289
290 return 0;
291}
292
293// Server function
294
295void ERecoMaster::monitor_loop()
296{
297 while (true) {
298 b2nsm_wait(10);
299 }
300}
301
302
303
Abstract base class for different kinds of events.
STL namespace.
Definition: nsm2.h:224