Belle II Software development
HLTSocket.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/rfarm/event/hltsocket/HLTSocket.h>
9#include <framework/logging/Logger.h>
10
11#include <unistd.h>
12#include <stdio.h>
13#include <sys/types.h>
14#include <netdb.h>
15#include <sys/socket.h>
16#include <netinet/in.h>
17#include <errno.h>
18#include <signal.h>
19
20#include <chrono>
21#include <thread>
22
23using namespace Belle2;
24
25HLTSocket::~HLTSocket()
26{
27 close(m_listener);
28 close(m_socket);
29 m_initialized = false;
30}
31
32int HLTSocket::put(char* data, int len)
33{
34 const int to_size = htonl(len);
35 const int br = write_data((char*) &to_size, sizeof(int));
36 if (br == 0) {
37 B2ERROR("Can not write to broken pipe.");
38 return 0;
39 } else if (br < 0) {
40 B2ERROR("Error in sending the size: " << strerror(errno));
41 return br;
42 }
43 const int bcount = write_data(data, len);
44 if (bcount == 0) {
45 B2ERROR("Can not write to broken pipe.");
46 return 0;
47 } else if (bcount < 0) {
48 B2ERROR("Error in sending the data: " << strerror(errno));
49 return bcount;
50 }
51 B2ASSERT("Written buffer size != buffer size in data!", bcount == len);
52 return bcount;
53}
54
55int HLTSocket::put_wordbuf(int* data, int len)
56{
57 // TODO: it is implicitly assumed that the first entry in the buffer is the buffer size!
58 const int gcount = data[0];
59 B2ASSERT("The first entry in the data must be the buffer size!", gcount == len);
60
61 int bcount = write_data((char*) data, len * sizeof(int));
62 if (bcount == 0) {
63 B2ERROR("Can not write to broken pipe.");
64 return 0;
65 } else if (bcount < 0) {
66 B2ERROR("Error in sending the data: " << strerror(errno));
67 return bcount;
68 }
69 bcount = ((bcount - 1) / sizeof(int) + 1);
70
71 B2ASSERT("Written buffer size != buffer size in data!", bcount == len);
72
73 // ATTENTION: the returned size is size / 4
74 return bcount;
75}
76
77int HLTSocket::write_data(char* data, int len)
78{
79 errno = 0;
80 char* ptr = data;
81 int bcount = 0;
82
83 while (bcount < len) {
84 int br = 0;
85 if ((br = ::write(m_socket, ptr, len - bcount)) > 0) {
86 bcount += br;
87 ptr += br;
88 }
89 if (br < 0) {
90 switch (errno) {
91 case EINTR:
92 return -1;
93 case EPIPE:
94 return 0; // connection closed, sigpipe
95 default:
96 return -1;
97 }
98 }
99 }
100 return bcount;
101}
102
103int HLTSocket::get(char* data, int len)
104{
105 int gcount;
106 const int br = read_data((char*) &gcount, 4);
107 if (br < 0) {
108 B2ERROR("Error in getting the size: " << strerror(errno));
109 return br;
110 } else if (br == 0) {
111 B2WARNING("No data from socket!");
112 return 0;
113 }
114
115 gcount = ntohl(gcount);
116 if (gcount > len) {
117 B2ERROR("buffer too small! " << gcount << " < " << len);
118 return -1;
119 }
120 const int bcount = read_data(data, gcount);
121 if (bcount < 0) {
122 B2ERROR("Error in getting the data: " << strerror(errno));
123 return bcount;
124 } else if (bcount == 0) {
125 B2WARNING("No data from socket!");
126 return 0;
127 }
128 B2ASSERT("Read buffer size != buffer size in data!", bcount == gcount);
129 return bcount;
130}
131
132int HLTSocket::get_wordbuf(int* wrdbuf, int len)
133{
134 int br = read_data((char*) wrdbuf, sizeof(int));
135 if (br < 0) {
136 B2ERROR("Error in getting the size: " << strerror(errno));
137 return br;
138 } else if (br == 0) {
139 B2WARNING("No data from socket!");
140 return 0;
141 }
142
143 const int gcount = (wrdbuf[0] - 1) * sizeof(int);
144 if (gcount > len) {
145 B2ERROR("buffer too small! " << gcount << " < " << len);
146 return -1;
147 }
148 // ATTENTION: the send size is size / 4
149 const int bcount = read_data((char*) &wrdbuf[1], gcount);
150 if (bcount < 0) {
151 B2ERROR("Error in getting the data: " << strerror(errno));
152 return bcount;
153 } else if (bcount == 0) {
154 B2WARNING("No data from socket!");
155 return 0;
156 }
157
158 B2ASSERT("Read buffer size != buffer size in data: " << bcount << " != " << gcount, bcount == gcount);
159 return (wrdbuf[0]);
160}
161
162int HLTSocket::read_data(char* data, int len)
163{
164 char* buf = data;
165 int bcount = 0;
166
167 while (bcount < len) {
168 int br = 0;
169 if ((br = ::read(m_socket, buf, len - bcount)) > 0) {
170 bcount += br;
171 buf += br;
172 }
173 if (br == 0) return 0;
174 if (br < 0) {
175 switch (errno) {
176 case EINTR:
177 return -1;
178 case EAGAIN:
179 continue;
180 default:
181 return -1;
182 }
183 }
184 }
185 return bcount;
186}
187
188bool HLTSocket::accept(unsigned int port)
189{
190 // Before going on make sure to close the old socket
191 close(m_socket);
192
193 // Initialize the first listener
194 if (m_listener <= 0) {
195 B2RESULT("Started listening for new clients");
196 struct sockaddr_in sa;
197 bzero(&sa, sizeof(struct sockaddr_in));
198
199 sa.sin_family = AF_INET;
200 sa.sin_port = htons(port);
201
202 if ((m_listener = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
203 B2ERROR("Socket initialization failed: " << strerror(errno));
204 return false;
205 }
206
207 int optval = 1;
208 setsockopt(m_listener, SOL_SOCKET, SO_REUSEADDR, &optval, 4);
209
210 int sizeval = D2_SOCKBUF_SIZE;
211 setsockopt(m_listener, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
212 setsockopt(m_listener, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
213
214 signal(SIGPIPE, SIG_IGN);
215
216 if ((bind(m_listener, (struct sockaddr*) &sa, sizeof(sa))) < 0) {
217 B2ERROR("Socket binding failed: " << strerror(errno));
218 return false;
219 }
220
221 listen(m_listener, 3);
222 }
223
224 struct sockaddr_in isa;
225 socklen_t i = sizeof(isa);
226 getsockname(m_listener, (struct sockaddr*)&isa, &i);
227
228 B2RESULT("Started accepting new clients");
229 if ((m_socket =::accept(m_listener, (struct sockaddr*)&isa, &i)) < 0) {
230 B2ERROR("Socket accepting failed: " << strerror(errno));
231 return false;
232 }
233
234 B2RESULT("Accepted connection with socket: " << m_socket);
235 m_initialized = true;
236 return true;
237}
238
239void HLTSocket::close(int socket)
240{
241 if (socket > 0) {
242 shutdown(socket, 2);
243 ::close(socket);
244 B2RESULT("Socket closed: " << socket);
245 }
246}
247
248bool HLTSocket::connect(const std::string& hostName, unsigned int port, const HLTMainLoop& mainLoop)
249{
250 // Before going on make sure to close the old socket
251 close(m_socket);
252
253 struct hostent* hp;
254 if ((hp = gethostbyname(hostName.c_str())) == NULL) {
255 B2ERROR("Host not found: " << strerror(errno));
256 return false;
257 }
258
259 struct sockaddr_in sa;
260 bzero(&sa, sizeof(sa));
261 bcopy(hp->h_addr, (char*) &sa.sin_addr, hp->h_length);
262 sa.sin_family = hp->h_addrtype;
263 sa.sin_port = htons((u_short) port);
264
265 if ((m_socket = socket(hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
266 B2ERROR("Socket initialization failed: " << strerror(errno));
267 return false;
268 }
269
270 int sizeval = D2_SOCKBUF_SIZE;
271 setsockopt(m_socket, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
272 setsockopt(m_socket, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
273 int yes = 1;
274 setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
275
276 signal(SIGPIPE, SIG_IGN);
277
278 using namespace std::chrono_literals;
279
280 int maxretry = 0;
281 while (::connect(m_socket, (struct sockaddr*) &sa, sizeof(sa)) < 0) {
282 if (errno == ETIMEDOUT or errno == ECONNREFUSED) {
283 B2WARNING("Connection failed, will retry in 1 second... " << maxretry);
284 std::this_thread::sleep_for(1s);
285 } else {
286 B2ERROR("Socket initialization failed: " << strerror(errno));
287 return false;
288 }
289
290 if (not mainLoop.isRunning()) {
291 return false;
292 }
293 maxretry++;
294 }
295
296 B2RESULT("Connected with socket: " << m_socket);
297 m_initialized = true;
298 return true;
299}
300
301bool HLTSocket::initialized() const
302{
303 return m_initialized;
304}
305void HLTSocket::deinitialize()
306{
307 m_initialized = false;
308}
Abstract base class for different kinds of events.