Belle II Software development
RSocketLib.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <unistd.h>
10#include <stdlib.h>
11#include <stdio.h>
12#include <string.h>
13#include <netdb.h>
14#include <sys/socket.h>
15#include <netinet/in.h>
16#include <errno.h>
17#include <sys/time.h>
18#include <signal.h>
19
20#include "daq/dataflow/RSocketLib.h"
21
22#define MAXHOSTNAME 100
23
24using namespace Belle2;
25
26
27// RSocketSend class
28
29RSocketSend::RSocketSend(u_short p)
30{
31
32 struct sockaddr_in sa;
33 bzero(&sa, sizeof(struct sockaddr_in));
34
35 m_errno = 0;
36
37 sa.sin_family = AF_INET;
38 sa.sin_port = htons(p);
39
40 int s;
41 if ((s = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
42 m_errno = errno;
43 perror("SocketRecv::socket");
44 return;
45 }
46
47 int optval = 1;
48 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &optval, 4);
49
50 int sizeval = D2_SOCKBUF_SIZE;
51 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
52 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
53
54 if ((bind(s, (struct sockaddr*)&sa, sizeof(sa))) < 0) {
55 m_errno = errno;
56 perror("RSocketSend::bind");
57 return;
58 }
59
60 m_sock = s;
61 m_port = (int)p;
62 m_sender = 0;
63 listen(s, 3);
64 printf("RSocketSend:: initialized, sock=%d\n", m_sock);
65
66 return;
67
68}
69
70RSocketSend::~RSocketSend()
71{
72 if (m_sender > 0) {
73 shutdown(m_sender, 2);
74 ::close(m_sender);
75 printf("RSocketSend:: receiving socket %d closed\n", m_sender);
76 }
77
78 shutdown(m_sock, 2);
79 ::close(m_sock);
80 printf("RSocketSend:: connection socket %d closed\n", m_sock);
81}
82
83int RSocketSend::accept()
84{
85 m_errno = 0;
86 struct sockaddr_in isa;
87 socklen_t i = sizeof(isa);
88 getsockname(m_sock, (struct sockaddr*)&isa, &i);
89
90 int t;
91 if ((t =::accept(m_sock, (struct sockaddr*)&isa, &i)) < 0) {
92 m_errno = errno;
93 return (-1);
94 }
95
96 printf("RSocketSend:: connection request accepted, sender=%d\n", t);
97
98 m_sender = t;
99 return (t);
100}
101
102int RSocketSend::close()
103{
104 // ::close ( m_sender );
105 // m_sender = 0;
106 printf("RSocketSend: destructed, m_sender = %d\n", m_sender);
107 return 0;
108}
109
110int RSocketSend::examine()
111{
112 // printf("RSocketSend::examine(): waiting for client connection ...\n");
113
114 m_errno = 0;
115 fd_set ready;
116 FD_ZERO(&ready);
117 FD_SET(m_sock, &ready);
118 struct timeval to;
119 to.tv_sec = 0;
120 to.tv_usec = 0;
121 if (select(FD_SETSIZE, &ready, 0, 0, &to) < 0) {
122 m_errno = errno;
123 perror("select");
124 return (-1);
125 }
126 if (FD_ISSET(m_sock, &ready)) {
127 printf("RSocketSend::connected!!!!\n");
128 return (1);
129 } else
130 return (0);
131}
132
133int RSocketSend::get(char* data, int len)
134{
135 // printf("SocketSend::get()\n");
136
137 m_errno = 0;
138 if (m_sender > 0)
139 return m_io.get(m_sender, data, len);
140 else
141 return -1;
142}
143
144int RSocketSend::read(char* data, int len)
145{
146 m_errno = 0;
147 if (m_sender > 0)
148 return m_io.read_data(m_sender, data, len);
149 else
150 return -1;
151}
152
153int RSocketSend::put(char* data, int len)
154{
155 // printf("RSocketSend::put (sd = %d)\n", m_sender);
156 m_errno = 0;
157 if (m_sender > 0)
158 return m_io.put(m_sender, data, len);
159 else
160 return -1;
161}
162
163int RSocketSend::put_wordbuf(int* data, int len)
164{
165 // printf("RSocketSend::put (sd = %d)\n", m_sender);
166 m_errno = 0;
167 if (m_sender > 0)
168 return m_io.put_wordbuf(m_sender, data, len);
169 else
170 return -1;
171}
172
173int RSocketSend::write(char* data, int len)
174{
175 m_errno = 0;
176 if (m_sender > 0)
177 return m_io.write_data(m_sender, data, len);
178 else
179 return -1;
180}
181
182
183int RSocketSend::sock() const
184{
185 return m_sock;
186}
187
188void RSocketSend::sock(int sockid)
189{
190 m_sock = sockid;
191}
192
193int RSocketSend::sender() const
194{
195 return m_sender;
196}
197
198int RSocketSend::port() const
199{
200 return m_port;
201}
202
203int RSocketSend::err() const
204{
205 return m_errno;
206}
207
208void RSocketSend::interrupt()
209{
210 m_io.interrupt();
211}
212
213
214// RSocketRecv class
215
216RSocketRecv::RSocketRecv(const char* node, u_short port)
217{
218 m_errno = 0;
219 m_sock = -1;
220 // struct hostent* hp;
221 if ((m_hp = gethostbyname(node)) == NULL) {
222 m_errno = errno;
223 fprintf(stderr,
224 "RSocketRecv::gethostbyname(%s): not found\n", node);
225 return;
226 }
227
228 // struct sockaddr_in sa;
229 bzero(&m_sa, sizeof(m_sa));
230 bcopy(m_hp->h_addr, (char*)&m_sa.sin_addr, m_hp->h_length);
231 m_sa.sin_family = m_hp->h_addrtype;
232 m_sa.sin_port = htons((u_short)port);
233
234 int s;
235 m_sock = -1;
236 if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
237 m_errno = errno;
238 perror("RSocketRecv:socket");
239 return;
240 }
241 int sizeval = D2_SOCKBUF_SIZE;
242 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
243 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
244 int yes = 1;
245 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
246
247 // Connect
248 int maxretry = 1000;
249 for (;;) {
250 printf("SocketRecv: connect trying %d times\n", 1000 - maxretry + 1);
251 int istat = connect(s, (struct sockaddr*)&m_sa, sizeof(m_sa));
252 if (istat >= 0) {
253 m_sock = s;
254 m_port = port;
255 strcpy(m_node, node);
256 signal(SIGPIPE, SIG_IGN);
257 printf("RSocketRecv: initialized, m_sock = %d\n", s);
258 return;
259 }
260 maxretry--;
261 if (maxretry == 0) {
262 printf("RSocketRecv: connection failed. exitting\n");
263 exit(-1);
264 }
265 sleep(5);
266 }
267
268 /*
269 int maxretry = 10;
270 tryagain:
271 if (connect(s, (struct sockaddr*)&m_sa, sizeof(m_sa)) < 0) {
272 m_errno = errno;
273 perror("RSocketRecv:connect");
274 printf("tried to connect to %s, port %d\n", node, port);
275 printf("connection error..... m_sock set to %d\n", m_sock);
276 if (m_errno == ETIMEDOUT) {
277 printf(".... try again after 5 sec. \n");
278 maxretry--;
279 sleep(5);
280 if (maxretry == 0) return;
281 goto tryagain;
282 }
283 return;
284 }
285
286 m_sock = s;
287 m_port = port;
288 strcpy(m_node, node);
289 signal(SIGPIPE, SIG_IGN);
290 printf("RSocketRecv: initialized, m_sock = %d\n", s);
291 */
292}
293
294RSocketRecv::~RSocketRecv()
295{
296 shutdown(m_sock, 2);
297 ::close(m_sock);
298 printf("RSocketRecv: destructed, m_sock = %d\n", m_sock);
299}
300
301int RSocketRecv::reconnect(int ntry)
302{
303 // Close existing socket once.
304 shutdown(m_sock, 2);
305 ::close(m_sock);
306
307 // Setup socket parameters again;
308 bzero(&m_sa, sizeof(m_sa));
309 bcopy(m_hp->h_addr, (char*)&m_sa.sin_addr, m_hp->h_length);
310 m_sa.sin_family = m_hp->h_addrtype;
311 m_sa.sin_port = htons((u_short)m_port);
312
313 // Reopen the socket
314 int s;
315 m_sock = -1;
316 if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
317 m_errno = errno;
318 perror("RSocketRecv:socket");
319 return -3;
320 }
321 int sizeval = D2_SOCKBUF_SIZE;
322 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
323 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
324 int yes = 1;
325 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
326
327 m_sock = s;
328
329 // Connect again
330 int maxretry = ntry;
331 // printf ("RSocketRecv: reconnecting socket %d, try %d times with 5sec. interval.\n", m_sock, ntry );
332
333 for (;;) {
334 printf("RSocketRecv: reconnecting (trial %d) \n", ntry - maxretry + 1);
335 int istat = connect(m_sock, (struct sockaddr*)&m_sa, sizeof(m_sa));
336 if (istat >= 0) {
337 printf("RSocketRecv: reconnected\n");
338 return 0;
339 }
340 maxretry--;
341 if (maxretry == 0) return -1;
342 sleep(5);
343 }
344 printf("RSocketRecv: m_sock = %d reconnected.\n", m_sock);
345}
346
347int RSocketRecv::get(char* data, int len)
348{
349 m_errno = 0;
350 return m_io.get(m_sock, data, len);
351}
352
353int RSocketRecv::get_wordbuf(int* data, int len)
354{
355 // printf("RSocketRecv::get_wordbuf()\n");
356
357 m_errno = 0;
358 if (m_sock > 0)
359 return m_io.get_wordbuf(m_sock, data, len);
360 else
361 return -1;
362}
363
364int RSocketRecv::read(char* data, int len)
365{
366 m_errno = 0;
367 return m_io.read_data(m_sock, data, len);
368}
369
370int RSocketRecv::put(char* data, int len)
371{
372 m_errno = 0;
373 // printf("RSocketRecv::put (sd = %d)\n", m_sock);
374 return m_io.put(m_sock, data, len);
375}
376
377int RSocketRecv::write(char* data, int len)
378{
379 m_errno = 0;
380 return m_io.write_data(m_sock, data, len);
381}
382
383char* RSocketRecv::node()
384{
385 return m_node;
386}
387
388int RSocketRecv::port()
389{
390 return m_port;
391}
392
393int RSocketRecv::sock()
394{
395 return m_sock;
396}
397
398void RSocketRecv::sock(int sockid)
399{
400 m_sock = sockid;
401}
402
403int RSocketRecv::err()
404{
405 return m_errno;
406}
407
408
409
410
411
412
413
414
Abstract base class for different kinds of events.