Belle II Software  release-08-01-10
RSocketLib.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <unistd.h>
10 #include <stdlib.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <netdb.h>
14 #include <sys/socket.h>
15 #include <netinet/in.h>
16 #include <errno.h>
17 #include <sys/time.h>
18 #include <signal.h>
19 
20 #include "daq/dataflow/RSocketLib.h"
21 
22 #define MAXHOSTNAME 100
23 
24 using namespace Belle2;
25 
26 
27 // RSocketSend class
28 
29 RSocketSend::RSocketSend(u_short p)
30 {
31 
32  struct sockaddr_in sa;
33  bzero(&sa, sizeof(struct sockaddr_in));
34 
35  m_errno = 0;
36 
37  sa.sin_family = AF_INET;
38  sa.sin_port = htons(p);
39 
40  int s;
41  if ((s = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
42  m_errno = errno;
43  perror("SocketRecv::socket");
44  return;
45  }
46 
47  int optval = 1;
48  setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &optval, 4);
49 
50  int sizeval = D2_SOCKBUF_SIZE;
51  setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
52  setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
53 
54  if ((bind(s, (struct sockaddr*)&sa, sizeof(sa))) < 0) {
55  m_errno = errno;
56  perror("RSocketSend::bind");
57  return;
58  }
59 
60  m_sock = s;
61  m_port = (int)p;
62  m_sender = 0;
63  listen(s, 3);
64  printf("RSocketSend:: initialized, sock=%d\n", m_sock);
65 
66  return;
67 
68 }
69 
70 RSocketSend::~RSocketSend()
71 {
72  if (m_sender > 0) {
73  shutdown(m_sender, 2);
74  ::close(m_sender);
75  printf("RSocketSend:: receiving socket %d closed\n", m_sender);
76  }
77 
78  shutdown(m_sock, 2);
79  ::close(m_sock);
80  printf("RSocketSend:: connection socket %d closed\n", m_sock);
81 }
82 
83 int RSocketSend::accept()
84 {
85  m_errno = 0;
86  struct sockaddr_in isa;
87  socklen_t i = sizeof(isa);
88  getsockname(m_sock, (struct sockaddr*)&isa, &i);
89 
90  int t;
91  if ((t =::accept(m_sock, (struct sockaddr*)&isa, &i)) < 0) {
92  m_errno = errno;
93  return (-1);
94  }
95 
96  printf("RSocketSend:: connection request accepted, sender=%d\n", t);
97 
98  m_sender = t;
99  return (t);
100 }
101 
102 int RSocketSend::close()
103 {
104  // ::close ( m_sender );
105  // m_sender = 0;
106  printf("RSocketSend: destructed, m_sender = %d\n", m_sender);
107  return 0;
108 }
109 
110 int RSocketSend::examine()
111 {
112  // printf("RSocketSend::examine(): waiting for client connection ...\n");
113 
114  m_errno = 0;
115  fd_set ready;
116  FD_ZERO(&ready);
117  FD_SET(m_sock, &ready);
118  struct timeval to;
119  to.tv_sec = 0;
120  to.tv_usec = 0;
121  if (select(FD_SETSIZE, &ready, 0, 0, &to) < 0) {
122  m_errno = errno;
123  perror("select");
124  return (-1);
125  }
126  if (FD_ISSET(m_sock, &ready)) {
127  printf("RSocketSend::connected!!!!\n");
128  return (1);
129  } else
130  return (0);
131 }
132 
133 int RSocketSend::get(char* data, int len)
134 {
135  // printf("SocketSend::get()\n");
136 
137  m_errno = 0;
138  if (m_sender > 0)
139  return m_io.get(m_sender, data, len);
140  else
141  return -1;
142 }
143 
144 int RSocketSend::read(char* data, int len)
145 {
146  m_errno = 0;
147  if (m_sender > 0)
148  return m_io.read_data(m_sender, data, len);
149  else
150  return -1;
151 }
152 
153 int RSocketSend::put(char* data, int len)
154 {
155  // printf("RSocketSend::put (sd = %d)\n", m_sender);
156  m_errno = 0;
157  if (m_sender > 0)
158  return m_io.put(m_sender, data, len);
159  else
160  return -1;
161 }
162 
163 int RSocketSend::put_wordbuf(int* data, int len)
164 {
165  // printf("RSocketSend::put (sd = %d)\n", m_sender);
166  m_errno = 0;
167  if (m_sender > 0)
168  return m_io.put_wordbuf(m_sender, data, len);
169  else
170  return -1;
171 }
172 
173 int RSocketSend::write(char* data, int len)
174 {
175  m_errno = 0;
176  if (m_sender > 0)
177  return m_io.write_data(m_sender, data, len);
178  else
179  return -1;
180 }
181 
182 
183 int RSocketSend::sock() const
184 {
185  return m_sock;
186 }
187 
188 void RSocketSend::sock(int sockid)
189 {
190  m_sock = sockid;
191 }
192 
193 int RSocketSend::sender() const
194 {
195  return m_sender;
196 }
197 
198 int RSocketSend::port() const
199 {
200  return m_port;
201 }
202 
203 int RSocketSend::err() const
204 {
205  return m_errno;
206 }
207 
208 void RSocketSend::interrupt()
209 {
210  m_io.interrupt();
211 }
212 
213 
214 // RSocketRecv class
215 
216 RSocketRecv::RSocketRecv(const char* node, u_short port)
217 {
218  m_errno = 0;
219  m_sock = -1;
220  // struct hostent* hp;
221  if ((m_hp = gethostbyname(node)) == NULL) {
222  m_errno = errno;
223  fprintf(stderr,
224  "RSocketRecv::gethostbyname(%s): not found\n", node);
225  return;
226  }
227 
228  // struct sockaddr_in sa;
229  bzero(&m_sa, sizeof(m_sa));
230  bcopy(m_hp->h_addr, (char*)&m_sa.sin_addr, m_hp->h_length);
231  m_sa.sin_family = m_hp->h_addrtype;
232  m_sa.sin_port = htons((u_short)port);
233 
234  int s;
235  m_sock = -1;
236  if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
237  m_errno = errno;
238  perror("RSocketRecv:socket");
239  return;
240  }
241  int sizeval = D2_SOCKBUF_SIZE;
242  setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
243  setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
244  int yes = 1;
245  setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
246 
247  // Connect
248  int maxretry = 1000;
249  for (;;) {
250  printf("SocketRecv: connect trying %d times\n", 1000 - maxretry + 1);
251  int istat = connect(s, (struct sockaddr*)&m_sa, sizeof(m_sa));
252  if (istat >= 0) {
253  m_sock = s;
254  m_port = port;
255  strcpy(m_node, node);
256  signal(SIGPIPE, SIG_IGN);
257  printf("RSocketRecv: initialized, m_sock = %d\n", s);
258  return;
259  }
260  maxretry--;
261  if (maxretry == 0) {
262  printf("RSocketRecv: connection failed. exitting\n");
263  exit(-1);
264  }
265  sleep(5);
266  }
267 
268  /*
269  int maxretry = 10;
270  tryagain:
271  if (connect(s, (struct sockaddr*)&m_sa, sizeof(m_sa)) < 0) {
272  m_errno = errno;
273  perror("RSocketRecv:connect");
274  printf("tried to connect to %s, port %d\n", node, port);
275  printf("connection error..... m_sock set to %d\n", m_sock);
276  if (m_errno == ETIMEDOUT) {
277  printf(".... try again after 5 sec. \n");
278  maxretry--;
279  sleep(5);
280  if (maxretry == 0) return;
281  goto tryagain;
282  }
283  return;
284  }
285 
286  m_sock = s;
287  m_port = port;
288  strcpy(m_node, node);
289  signal(SIGPIPE, SIG_IGN);
290  printf("RSocketRecv: initialized, m_sock = %d\n", s);
291  */
292 }
293 
294 RSocketRecv::~RSocketRecv()
295 {
296  shutdown(m_sock, 2);
297  ::close(m_sock);
298  printf("RSocketRecv: destructed, m_sock = %d\n", m_sock);
299 }
300 
301 int RSocketRecv::reconnect(int ntry)
302 {
303  // Close existing socket once.
304  shutdown(m_sock, 2);
305  ::close(m_sock);
306 
307  // Setup socket parameters again;
308  bzero(&m_sa, sizeof(m_sa));
309  bcopy(m_hp->h_addr, (char*)&m_sa.sin_addr, m_hp->h_length);
310  m_sa.sin_family = m_hp->h_addrtype;
311  m_sa.sin_port = htons((u_short)m_port);
312 
313  // Reopen the socket
314  int s;
315  m_sock = -1;
316  if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
317  m_errno = errno;
318  perror("RSocketRecv:socket");
319  return -3;
320  }
321  int sizeval = D2_SOCKBUF_SIZE;
322  setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
323  setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
324  int yes = 1;
325  setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
326 
327  m_sock = s;
328 
329  // Connect again
330  int maxretry = ntry;
331  // printf ("RSocketRecv: reconnecting socket %d, try %d times with 5sec. interval.\n", m_sock, ntry );
332 
333  for (;;) {
334  printf("RSocketRecv: reconnecting (trial %d) \n", ntry - maxretry + 1);
335  int istat = connect(m_sock, (struct sockaddr*)&m_sa, sizeof(m_sa));
336  if (istat >= 0) {
337  printf("RSocketRecv: reconnected\n");
338  return 0;
339  }
340  maxretry--;
341  if (maxretry == 0) return -1;
342  sleep(5);
343  }
344  printf("RSocketRecv: m_sock = %d reconnected.\n", m_sock);
345 }
346 
347 int RSocketRecv::get(char* data, int len)
348 {
349  m_errno = 0;
350  return m_io.get(m_sock, data, len);
351 }
352 
353 int RSocketRecv::get_wordbuf(int* data, int len)
354 {
355  // printf("RSocketRecv::get_wordbuf()\n");
356 
357  m_errno = 0;
358  if (m_sock > 0)
359  return m_io.get_wordbuf(m_sock, data, len);
360  else
361  return -1;
362 }
363 
364 int RSocketRecv::read(char* data, int len)
365 {
366  m_errno = 0;
367  return m_io.read_data(m_sock, data, len);
368 }
369 
370 int RSocketRecv::put(char* data, int len)
371 {
372  m_errno = 0;
373  // printf("RSocketRecv::put (sd = %d)\n", m_sock);
374  return m_io.put(m_sock, data, len);
375 }
376 
377 int RSocketRecv::write(char* data, int len)
378 {
379  m_errno = 0;
380  return m_io.write_data(m_sock, data, len);
381 }
382 
383 char* RSocketRecv::node()
384 {
385  return m_node;
386 }
387 
388 int RSocketRecv::port()
389 {
390  return m_port;
391 }
392 
393 int RSocketRecv::sock()
394 {
395  return m_sock;
396 }
397 
398 void RSocketRecv::sock(int sockid)
399 {
400  m_sock = sockid;
401 }
402 
403 int RSocketRecv::err()
404 {
405  return m_errno;
406 }
407 
408 
409 
410 
411 
412 
413 
414 
Abstract base class for different kinds of events.