Belle II Software development
RFEventServer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include "daq/rfarm/manager/RFEventServer.h"
10
11#include <sys/stat.h>
12#include <sys/types.h>
13#include <sys/wait.h>
14#include <unistd.h>
15
16#include <csignal>
17#include <cstring>
18#include <iostream>
19
20#define RFEVSOUT stdout
21
22using namespace std;
23using namespace Belle2;
24
25RFEventServer* RFEventServer::s_instance = 0;
26//RFServerBase* RFServerBase::s_instance = 0;
27
28RFEventServer::RFEventServer(string conffile)
29{
30 // 0. Initialize configuration manager
31 m_conf = new RFConf(conffile.c_str());
32 char* nodename = m_conf->getconf("distributor", "nodename");
33 // char nodename[256];
34 // gethostname ( nodename, sizeof(nodename) );
35
36 // 1. Set execution directory
37 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/distributor";
38
39 mkdir(execdir.c_str(), 0755);
40 chdir(execdir.c_str());
41
42 // 2. Initialize local shared memory
43 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
44 string(m_conf->getconf("distributor", "nodename"));
45 m_shm = new RFSharedMem((char*)shmname.c_str());
46
47 // 3. Initialize process manager
48 m_proc = new RFProcessManager(nodename);
49
50 // 4. Initialize RingBuffers
51 // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
52 string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
53 string(m_conf->getconf("distributor", "ringbuffer"));
54 int rbufsize = m_conf->getconfi("distributor", "ringbuffersize");
55 m_rbufin = new RingBuffer(ringbuf.c_str(), rbufsize);
56
57 // 5. Initialize LogManager
58 m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
59
60 // 6. Initialize data flow monitor
61 m_flow = new RFFlowStat((char*)shmname.c_str());
62
63 // 7. Clear PID list
64 m_pid_recv = 0;
65 for (int i = 0; i < m_nnodes; i++)
66 m_pid_sender[i] = 0 ;
67
68}
69
70RFEventServer::~RFEventServer()
71{
72 delete m_log;
73 delete m_proc;
74 delete m_shm;
75 delete m_conf;
76 delete m_flow;
77 delete m_rbufin;
78}
79
80// Access to Singleton
81RFEventServer& RFEventServer::Create(const string& conffile)
82{
83 if (!s_instance) {
84 s_instance = new RFEventServer(conffile);
85 }
86 return *s_instance;
87}
88
89RFEventServer& RFEventServer::Instance()
90{
91 return *s_instance;
92}
93
94
95// Functions hooked up by NSM2
96
97int RFEventServer::Configure(NSMmsg*, NSMcontext*)
98{
99 // 0. Global parameters
100 // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
101 string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
102 string(m_conf->getconf("distributor", "ringbuffer"));
103
104 // char* shmname = m_conf->getconf("distributor", "nodename");
105 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
106 string(m_conf->getconf("distributor", "nodename"));
107
108 // 2. Run sender
109 m_nnodes = 0;
110 int maxnodes = m_conf->getconfi("processor", "nnodes");
111 int idbase = m_conf->getconfi("processor", "idbase");
112 // char* hostbase = m_conf->getconf("processor", "hostbase");
113 char* badlist = m_conf->getconf("processor", "badlist");
114
115 char* sender = m_conf->getconf("distributor", "sender", "script");
116 int portbase = m_conf->getconfi("distributor", "sender", "portbase");
117
118 char idname[3], shmid[3];
119 for (int i = 0; i < maxnodes; i++) {
120 sprintf(idname, "%2.2d", idbase + i);
121 sprintf(shmid, "%2.2d", i);
122 if (badlist == NULL ||
123 strstr(badlist, idname) == 0) {
124 int port = (idbase + i) + portbase;
125 char portchar[256];
126 sprintf(portchar, "%d", port);
127 m_pid_sender[m_nnodes] = m_proc->Execute(sender, (char*)ringbuf.c_str(), portchar, (char*)shmname.c_str(), shmid);
128 m_flow->clear(i);
129 m_nnodes++;
130 }
131 }
132
133 // 1. Run receiver
134 char* srcG = m_conf->getconf("distributor", "source");
135 if (strstr(srcG, "net") != 0) {
136 // Run receiver
137 char* receiver = m_conf->getconf("distributor", "receiver", "script");
138 char* src = m_conf->getconf("distributor", "receiver", "host");
139 char* port = m_conf->getconf("distributor", "receiver", "port");
140 // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
141 char idbuf[3];
142 sprintf(idbuf, "%2.2d", RF_INPUT_ID);
143 m_pid_recv = m_proc->Execute(receiver, (char*)ringbuf.c_str(), src, port, (char*)shmname.c_str(), idbuf);
144 m_flow->clear(RF_INPUT_ID);
145 } else if (strstr(srcG, "file") != 0) {
146 // Run file reader
147 char* filein = m_conf->getconf("distributor", "fileinput", "script");
148 char* file = m_conf->getconf("distributor", "fileinput", "filename");
149 // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
150 char* nnodechr = m_conf->getconf("distributor", "nnodes");
151 m_pid_recv = m_proc->Execute(filein, (char*)ringbuf.c_str(), file, nnodechr);
152 }
153
154 m_rbufin->forceClear();
155
156 // else none
157 return 0;
158}
159
160int RFEventServer::UnConfigure(NSMmsg*, NSMcontext*)
161{
162 // system("killall sock2rbr rb2sockr");
163 int status;
164 if (m_pid_recv != 0) {
165 kill(m_pid_recv, SIGINT);
166 waitpid(m_pid_recv, &status, 0);
167 }
168 for (int i = 0; i < m_nnodes; i++) {
169 if (m_pid_sender[i] != 0) {
170 printf("RFEventServer:: killing sender pid=%d\n", m_pid_sender[i]);
171 // kill(m_pid_sender[i], SIGINT);
172 kill(m_pid_sender[i], SIGINT);
173 waitpid(m_pid_sender[i], &status, 0);
174 }
175 }
176 // Clear RingBuffer
177 m_rbufin->forceClear();
178
179 // Clear process list
180 m_flow->fillProcessStatus(GetNodeInfo());
181
182 printf("Unconfigure : done\n");
183 return 0;
184}
185
186int RFEventServer::Start(NSMmsg*, NSMcontext*)
187{
188 // m_rbufin->clear();
189 // m_rbufin->forceClear();
190 return 0;
191}
192
193int RFEventServer::Stop(NSMmsg*, NSMcontext*)
194{
195 m_rbufin->clear();
196 return 0;
197}
198
199
200int RFEventServer::Restart(NSMmsg*, NSMcontext*)
201{
202 printf("RFEventServer : Restarting!!!!!\n");
203 /* Original impl.
204 if (m_pid_recv != 0) {
205 kill(m_pid_recv, SIGINT);
206 }
207 for (int i = 0; i < m_nnodes; i++) {
208 if (m_pid_sender[i] != 0) {
209 printf("RFEventServer:: killing sender pid=%d\n", m_pid_sender[i]);
210 kill(m_pid_sender[i], SIGINT);
211 }
212 }
213 // Simple implementation
214 system("killall sock2rbr rb2sockr");
215 fflush(stdout);
216 */
217 NSMmsg* nsmmsg = NULL;
218 NSMcontext* nsmcontext = NULL;
219 RFEventServer::UnConfigure(nsmmsg, nsmcontext);
220 sleep(2);
221 RFEventServer::Configure(nsmmsg, nsmcontext);
222 return 0;
223}
224
225// Server function
226
227void RFEventServer::server()
228{
229 // int nevt = 0;
230 m_flow->fillProcessStatus(GetNodeInfo());
231
232 while (true) {
233 int sender_id = 0;
234 pid_t pid = m_proc->CheckProcess();
235 if (pid > 0) {
236 printf("RFEventServer : process dead. pid = %d\n", pid);
237 if (pid == m_pid_recv) {
238 m_log->Fatal("RFEventServer : receiver process dead. pid=%d\n", pid);
239 m_pid_recv = 0;
240 } else {
241 for (int i = 0; i < m_nnodes; i++) {
242 if (pid == m_pid_sender[i]) {
243 m_log->Fatal("RFEventServer : sender process (%d) dead. pid=%d\n", i, m_pid_sender[i]);
244 m_pid_sender[i] = 0;
245 sender_id = i;
246 }
247 }
248 }
249 }
250 int st = m_proc->CheckOutput();
251 if (st < 0) {
252 perror("RFEventServer::server");
253 // exit ( -1 );
254 } else if (st > 0) {
255 m_log->ProcessLog(m_proc->GetFd());
256 }
257 m_flow->fillNodeInfo(RF_INPUT_ID, GetNodeInfo(), false);
258 m_flow->fillProcessStatus(GetNodeInfo(), m_pid_recv, m_pid_sender[sender_id]);
259 // m_flow->fillNodeInfo(0, GetNodeInfo(), false);
260 // Debug
261 // RfNodeInfo* info = GetNodeInfo();
262 //info->nevent_in = nevt++;
263 // info->nqueue_in = nevt;
264 // printf ( "FillNodeInfo called!! info->nevent_in = %d\n", info->nevent_in );
265 }
266}
267
268void RFEventServer::cleanup()
269{
270 printf("RFEventServer : cleaning up\n");
271 UnConfigure(NULL, NULL);
272 /*
273 kill ( m_pid_recv, SIGINT );
274 int status;
275 waitpid ( m_pid_recv, &status, 0 );
276 printf ( "RFEventServer : receiver terminated.\n" );
277 for ( int i=0; i<m_nnodes; i++ ) {
278 kill ( m_pid_sender[i], SIGINT );
279 waitpid ( m_pid_sender[i], &status, 0 );
280 printf ( "RFEventServer : sender [%d] terminated.\n", i );
281 }
282 */
283 printf("RFEventServer: Done. Exitting\n");
284 exit(-1);
285}
286
int Execute(char *script, int nargs, char **args)
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int clear()
Clear the RingBuffer.
Definition: RingBuffer.cc:426
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Definition: RingBuffer.cc:441
Abstract base class for different kinds of events.
STL namespace.
Definition: nsm2.h:224