Belle II Software development
RFRoiSender.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <daq/rfarm/manager/RFRoiSender.h>
10
11#include <sys/stat.h>
12#include <sys/types.h>
13#include <sys/wait.h>
14
15#include <unistd.h>
16
17#include <csignal>
18#include <cstring>
19#include <string>
20
21using namespace Belle2;
22using namespace std;
23
24RFRoiSender::RFRoiSender(string conffile)
25{
26 // 0. Initialize configuration manager
27 m_conf = new RFConf(conffile.c_str());
28 char* nodename = m_conf->getconf("roisender", "nodename");
29 printf("RoiSender : nodename = %s\n", nodename);
30 fflush(stdout);
31 // char nodename[256];
32 // gethostname ( nodename, sizeof(nodename) );
33
34 // 1. Set execution directory
35 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/roisender";
36
37 mkdir(execdir.c_str(), 0755);
38 chdir(execdir.c_str());
39
40 // 2. Initialize local shared memory
41 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
42 string(nodename);
43 m_shm = new RFSharedMem((char*)shmname.c_str());
44 printf("RoiSender : shmname = %s\n", shmname.c_str());
45
46 // 3. Initialize process manager
47 m_proc = new RFProcessManager(nodename);
48
49 // 4. Initialize LogManager
50 m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
51
52 // 5. Initialize data flow monitor
53 m_flow = new RFFlowStat((char*)shmname.c_str());
54
55 // 6. Clear PID list
56 m_pid_sender = 0;
57 m_pid_merger = 0;
58
59}
60
61RFRoiSender::~RFRoiSender()
62{
63 delete m_log;
64 delete m_proc;
65 delete m_conf;
66}
67
68// Functions hooked up by NSM2
69
70int RFRoiSender::Configure(NSMmsg* nsmm, NSMcontext* /*nsmc*/)
71{
72 // 0. Do you need RoI sender?
73 char* roisw = m_conf->getconf("roisender", "enabled");
74 if (nsmm->len > 0) {
75 roisw = (char*) nsmm->datap;
76 printf("Configure: roisender enable flag is overridden : %s\n", roisw);
77 }
78 if (strstr(roisw, "yes") == 0) return 0;
79
80 // 1. Run merger first
81 char* merger = m_conf->getconf("roisender", "merger");
82 char* mergerport = m_conf->getconf("roisender", "mergerport");
83 char* mergerhost = m_conf->getconf("roisender", "mergerhost");
84 char* onsenhost = m_conf->getconf("roisender", "onsenhost");
85 char* onsenport = m_conf->getconf("roisender", "onsenport");
86
87 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
88 string(m_conf->getconf("roisender", "nodename"));
89
90 // Note: merger should be run on a separate host in Belle2DAQ
91 if (strstr(merger, "none") == 0) {
92 char idbuf[3];
93 sprintf(idbuf, "%2.2d", RF_ROI_ID);
94 m_pid_merger = m_proc->Execute(merger, (char*)shmname.c_str(), idbuf, onsenhost, onsenport, mergerport);
95 sleep(2);
96 }
97
98 // 2. Run sender
99 char* sender = m_conf->getconf("roisender", "sender");
100 int nqueue = m_conf->getconfi("roisender", "nqueues");
101 // char* qbase = m_conf->getconf("roisender", "qnamebase");
102
103 char* arglist[20];
104 arglist[0] = mergerhost;
105 arglist[1] = mergerport;
106 int nargs = 2;
107 char roiqs[10][256];
108 for (int i = 0; i < nqueue; i++) {
109 sprintf(roiqs[i], "/roi%d", i);
110 arglist[i + 2] = roiqs[i];
111 nargs++;
112 }
113 // m_pid_sender = m_proc->Execute(sender, mergerhost, mergerport);
114 m_pid_sender = m_proc->Execute(sender, nargs, arglist);
115
116 return 0;
117
118}
119
120int RFRoiSender::UnConfigure(NSMmsg*, NSMcontext*)
121{
122 // system("killall merger_merge hltout2merger");
123 int statx;
124 if (m_pid_sender != 0) {
125 kill(m_pid_sender, SIGINT);
126 waitpid(m_pid_sender, &statx, 0);
127 m_pid_sender = 0;
128 }
129 if (m_pid_merger != 0) {
130 kill(m_pid_merger, SIGINT);
131 waitpid(m_pid_merger, &statx, 0);
132 m_pid_merger = 0;
133 }
134 printf("UnConfigure : done\n");
135 return 0;
136}
137
138int RFRoiSender::Start(NSMmsg*, NSMcontext*)
139{
140 return 0;
141}
142
143int RFRoiSender::Stop(NSMmsg*, NSMcontext*)
144{
145 return 0;
146}
147
148
149int RFRoiSender::Restart(NSMmsg*, NSMcontext*)
150{
151 printf("RFRoiSender : Restarted!!!!!\n");
152 /* Original Impl
153 if (m_pid_dqm != 0) {
154 kill(m_pid_dqm, SIGINT);
155 }
156 */
157 // system("killall merger_merge hltout2merger");
158 // fflush(stdout);
159 NSMmsg* nsmmsg = NULL;
160 NSMcontext* nsmcontext = NULL;
161 RFRoiSender::UnConfigure(nsmmsg, nsmcontext);
162 sleep(2);
163 RFRoiSender::Configure(nsmmsg, nsmcontext);
164 return 0;
165}
166
167// Server function
168
169void RFRoiSender::server()
170{
171 while (true) {
172 pid_t pid = m_proc->CheckProcess();
173 if (pid > 0) {
174 printf("RFRoiSender : process dead. pid=%d\n", pid);
175 if (pid == m_pid_sender) {
176 m_log->Fatal("RFRoiSender : hltout2merger dead. pid = %d\n", pid);
177 // RFNSM_Status::Instance().set_state(RFSTATE_ERROR); // << will this really set the state? ERROR is not defined anyway
178 } else if (pid == m_pid_merger) {
179 m_log->Fatal("RFRoiSender : merger2merge dead. pid = %d\n", pid);
180 // RFNSM_Status::Instance().set_state(RFSTATE_ERROR); // << will this really set the state? ERROR is not defined anyway
181 }
182 }
183
184 int st = m_proc->CheckOutput();
185 if (st < 0) {
186 perror("RFRoiSender::server");
187 // exit ( -1 );
188 } else if (st > 0) {
189 m_log->ProcessLog(m_proc->GetFd());
190 }
191 m_flow->fillNodeInfo(RF_ROI_ID, GetNodeInfo(), true);
192 }
193}
194
195void RFRoiSender::cleanup()
196{
197 printf("RFRoiSender : cleaning up\n");
198 UnConfigure(NULL, NULL);
199 printf("RFRoiSender: Done. Exitting\n");
200 exit(-1);
201}
int Execute(char *script, int nargs, char **args)
Abstract base class for different kinds of events.
STL namespace.
Definition: nsm2.h:224