Belle II Software development
ERecoDistributor.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include "daq/expreco/ERecoDistributor.h"
10
11#include <sys/stat.h>
12#include <sys/types.h>
13#include <sys/wait.h>
14#include <unistd.h>
15
16#include <csignal>
17#include <cstring>
18#include <iostream>
19
20#define RFEVSOUT stdout
21
22using namespace std;
23using namespace Belle2;
24
25ERecoDistributor::ERecoDistributor(string conffile)
26{
27 // 0. Initialize configuration manager
28 m_conf = new RFConf(conffile.c_str());
29 char* nodename = m_conf->getconf("distributor", "nodename");
30 // char nodename[256];
31 // gethostname ( nodename, sizeof(nodename) );
32
33 // 1. Set execution directory
34 string execdir = string(m_conf->getconf("system", "execdir_base")) + "/distributor";
35
36 mkdir(execdir.c_str(), 0755);
37 chdir(execdir.c_str());
38
39 // 2. Initialize local shared memory
40 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
41 string(m_conf->getconf("distributor", "nodename"));
42 m_shm = new RFSharedMem((char*)shmname.c_str());
43
44 // 3. Initialize process manager
45 m_proc = new RFProcessManager(nodename);
46
47 // 4. Initialize RingBuffers
48 // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
49 string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
50 string(m_conf->getconf("distributor", "ringbuffer"));
51 int rbufsize = m_conf->getconfi("distributor", "ringbuffersize");
52 m_rbufin = new RingBuffer(ringbuf.c_str(), rbufsize);
53
54 // 5. Initialize LogManager
55 m_log = new RFLogManager(nodename, m_conf->getconf("system", "lognode"));
56
57 // 6. Initialize data flow monitor
58 m_flow = new RFFlowStat((char*)shmname.c_str());
59
60}
61
62ERecoDistributor::~ERecoDistributor()
63{
64 delete m_log;
65 delete m_proc;
66 delete m_shm;
67 delete m_conf;
68 delete m_flow;
69 delete m_rbufin;
70}
71
72
73// Functions hooked up by NSM2
74
75int ERecoDistributor::Configure(NSMmsg*, NSMcontext*)
76{
77 // 0. Global parameters
78 // char* ringbuf = m_conf->getconf("distributor", "ringbuffer");
79 string ringbuf = string(m_conf->getconf("system", "unitname")) + ":" +
80 string(m_conf->getconf("distributor", "ringbuffer"));
81
82 // char* shmname = m_conf->getconf("distributor", "nodename");
83 string shmname = string(m_conf->getconf("system", "unitname")) + ":" +
84 string(m_conf->getconf("distributor", "nodename"));
85
86 // 1. Run sender
87 m_nnodes = 0;
88 int maxnodes = m_conf->getconfi("processor", "nnodes");
89 int idbase = m_conf->getconfi("processor", "idbase");
90 //char* hostbase = m_conf->getconf("processor", "hostbase");
91 char* badlist = m_conf->getconf("processor", "badlist");
92
93 char* sender = m_conf->getconf("distributor", "sender", "script");
94 int portbase = m_conf->getconfi("distributor", "sender", "portbase");
95
96
97 char hostname[512], idname[3], shmid[3];
98 for (int i = 0; i < maxnodes; i++) {
99 sprintf(idname, "%2.2d", idbase + i);
100 sprintf(shmid, "%2.2d", i);
101 if (badlist == NULL ||
102 strstr(badlist, idname) == 0) {
103 int port = (idbase + i) + portbase;
104 char portchar[256];
105 sprintf(portchar, "%d", port);
106 m_pid_sender[m_nnodes] = m_proc->Execute(sender, (char*)ringbuf.c_str(), portchar, (char*)shmname.c_str(), shmid);
107 printf("Running sender to %d\n", port);
108 fflush(stdout);
109 m_nnodes++;
110 }
111 }
112
113 // 2. Run receiver
114 m_nrecv = 0;
115 int maxrecv = m_conf->getconfi("distributor", "receiver", "nnodes");
116 int ridbase = m_conf->getconfi("distributor", "receiver", "idbase");
117 char* rhostbase = m_conf->getconf("distributor", "receiver", "hostbase");
118 char* rbadlist = m_conf->getconf("distributor", "receiver", "badlist");
119 char* port = m_conf->getconf("distributor", "receiver", "port");
120 char* receiver = m_conf->getconf("distributor", "receiver", "script");
121
122 // char hostname[512], idname[3], shmid[3];
123 for (int i = 0; i < maxrecv; i++) {
124 sprintf(hostname, "%s%2.2d", rhostbase, ridbase + i);
125 sprintf(idname, "%2.2d", ridbase + i);
126 sprintf(shmid, "%2.2d", i + 20);
127 if (rbadlist == NULL ||
128 strstr(rbadlist, hostname) == 0) {
129 printf("Running receiver for %s\n", hostname);
130 fflush(stdout);
131 m_pid_recv[m_nrecv] = m_proc->Execute(receiver, (char*)ringbuf.c_str(), hostname, port, (char*)shmname.c_str(), shmid);
132 m_nrecv++;
133 }
134 }
135
136 printf("ERecoDistributor : Configure done\n");
137 return 0;
138}
139
140int ERecoDistributor::UnConfigure(NSMmsg*, NSMcontext*)
141{
142 // system("killall sock2rbr rb2sockr");
143 int status;
144
145 printf("m_nrecv = %d\n", m_nrecv);
146 for (int i = 0; i < m_nrecv; i++) {
147 if (m_pid_recv[i] != 0) {
148 printf("ERecoDistributor : killing receiver pid=%d\n", m_pid_recv[i]);
149 kill(m_pid_recv[i], SIGKILL);
150 waitpid(m_pid_recv[i], &status, 0);
151 }
152 }
153
154 printf("m_nnodes = %d\n", m_nnodes);
155 for (int i = 0; i < m_nnodes; i++) {
156 if (m_pid_sender[i] != 0) {
157 printf("ERecoDistributor : killing sender pid=%d\n", m_pid_sender[i]);
158 // kill(m_pid_sender[i], SIGINT);
159 kill(m_pid_sender[i], SIGKILL);
160 waitpid(m_pid_sender[i], &status, 0);
161 }
162 }
163 // Clear RingBuffer
164 m_rbufin->forceClear();
165
166 // Clear process list
167 m_flow->fillProcessStatus(GetNodeInfo());
168
169 printf("ERecoDistributor : Unconfigure done\n");
170 return 0;
171}
172
173int ERecoDistributor::Start(NSMmsg*, NSMcontext*)
174{
175 m_rbufin->clear();
176 return 0;
177}
178
179int ERecoDistributor::Stop(NSMmsg*, NSMcontext*)
180{
181 // m_rbufin->clear();
182 return 0;
183}
184
185
186int ERecoDistributor::Restart(NSMmsg*, NSMcontext*)
187{
188 printf("ERecoDistributor : Restarting!!!!!\n");
189 NSMmsg* nsmmsg = NULL;
190 NSMcontext* nsmcontext = NULL;
191 ERecoDistributor::UnConfigure(nsmmsg, nsmcontext);
192 sleep(2);
193 ERecoDistributor::Configure(nsmmsg, nsmcontext);
194 return 0;
195}
196
197// Server function
198
199void ERecoDistributor::server()
200{
201 // int nevt = 0;
202 m_flow->fillProcessStatus(GetNodeInfo());
203
204 while (true) {
205 pid_t pid = m_proc->CheckProcess();
206 if (pid > 0) {
207 printf("ERecoDistributor : process dead. pid = %d\n", pid);
208 for (int i = 0; i < m_nrecv; i++) {
209 if (pid == m_pid_recv[i]) {
210 m_log->Fatal("ERecoDistributor : receiver process (%d) dead. pid=%d\n", i, pid);
211 m_pid_recv[i] = 0;
212 }
213 }
214 for (int i = 0; i < m_nnodes; i++) {
215 if (pid == m_pid_sender[i]) {
216 m_log->Fatal("ERecoDistributor : sender process (%d) dead. pid=%d\n", i, m_pid_sender[i]);
217 m_pid_sender[i] = 0;
218 }
219 }
220 }
221
222 int st = m_proc->CheckOutput();
223 if (st < 0) {
224 perror("ERecoDistributor::server");
225 // exit ( -1 );
226 } else if (st > 0) {
227 m_log->ProcessLog(m_proc->GetFd());
228 }
229 }
230 // m_flow->fillNodeInfo(RF_INPUT_ID, GetNodeInfo(), false);
231 // m_flow->fillProcessStatus(GetNodeInfo(), m_pid_recv, m_pid_sender[sender_id]);
232 // m_flow->fillNodeInfo(0, GetNodeInfo(), false);
233 // Debug
234 // RfNodeInfo* info = GetNodeInfo();
235 //info->nevent_in = nevt++;
236 // info->nqueue_in = nevt;
237 // printf ( "FillNodeInfo called!! info->nevent_in = %d\n", info->nevent_in );
238}
239
240
241
242
int Execute(char *script, int nargs, char **args)
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int clear()
Clear the RingBuffer.
Definition: RingBuffer.cc:426
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Definition: RingBuffer.cc:441
Abstract base class for different kinds of events.
STL namespace.
Definition: nsm2.h:224