14#include <sys/socket.h>
15#include <netinet/in.h>
20#include "daq/dataflow/RSocketLib.h"
22#define MAXHOSTNAME 100
29RSocketSend::RSocketSend(u_short p)
32 struct sockaddr_in sa;
33 bzero(&sa,
sizeof(
struct sockaddr_in));
37 sa.sin_family = AF_INET;
38 sa.sin_port = htons(p);
41 if ((s = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
43 perror(
"SocketRecv::socket");
48 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &optval, 4);
50 int sizeval = D2_SOCKBUF_SIZE;
51 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
52 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
54 if ((bind(s, (
struct sockaddr*)&sa,
sizeof(sa))) < 0) {
56 perror(
"RSocketSend::bind");
64 printf(
"RSocketSend:: initialized, sock=%d\n", m_sock);
70RSocketSend::~RSocketSend()
73 shutdown(m_sender, 2);
75 printf(
"RSocketSend:: receiving socket %d closed\n", m_sender);
80 printf(
"RSocketSend:: connection socket %d closed\n", m_sock);
83int RSocketSend::accept()
86 struct sockaddr_in isa;
87 socklen_t i =
sizeof(isa);
88 getsockname(m_sock, (
struct sockaddr*)&isa, &i);
91 if ((t =::accept(m_sock, (
struct sockaddr*)&isa, &i)) < 0) {
96 printf(
"RSocketSend:: connection request accepted, sender=%d\n", t);
102int RSocketSend::close()
106 printf(
"RSocketSend: destructed, m_sender = %d\n", m_sender);
110int RSocketSend::examine()
117 FD_SET(m_sock, &ready);
121 if (select(FD_SETSIZE, &ready, 0, 0, &to) < 0) {
126 if (FD_ISSET(m_sock, &ready)) {
127 printf(
"RSocketSend::connected!!!!\n");
133int RSocketSend::get(
char* data,
int len)
139 return m_io.get(m_sender, data, len);
144int RSocketSend::read(
char* data,
int len)
148 return m_io.read_data(m_sender, data, len);
153int RSocketSend::put(
char* data,
int len)
158 return m_io.put(m_sender, data, len);
163int RSocketSend::put_wordbuf(
int* data,
int len)
168 return m_io.put_wordbuf(m_sender, data, len);
173int RSocketSend::write(
char* data,
int len)
177 return m_io.write_data(m_sender, data, len);
183int RSocketSend::sock()
const
188void RSocketSend::sock(
int sockid)
193int RSocketSend::sender()
const
198int RSocketSend::port()
const
203int RSocketSend::err()
const
208void RSocketSend::interrupt()
216RSocketRecv::RSocketRecv(
const char* node, u_short port)
221 if ((m_hp = gethostbyname(node)) == NULL) {
224 "RSocketRecv::gethostbyname(%s): not found\n", node);
229 bzero(&m_sa,
sizeof(m_sa));
230 bcopy(m_hp->h_addr, (
char*)&m_sa.sin_addr, m_hp->h_length);
231 m_sa.sin_family = m_hp->h_addrtype;
232 m_sa.sin_port = htons((u_short)port);
236 if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
238 perror(
"RSocketRecv:socket");
241 int sizeval = D2_SOCKBUF_SIZE;
242 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
243 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
245 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
250 printf(
"SocketRecv: connect trying %d times\n", 1000 - maxretry + 1);
251 int istat = connect(s, (
struct sockaddr*)&m_sa,
sizeof(m_sa));
255 strcpy(m_node, node);
256 signal(SIGPIPE, SIG_IGN);
257 printf(
"RSocketRecv: initialized, m_sock = %d\n", s);
262 printf(
"RSocketRecv: connection failed. exitting\n");
294RSocketRecv::~RSocketRecv()
298 printf(
"RSocketRecv: destructed, m_sock = %d\n", m_sock);
301int RSocketRecv::reconnect(
int ntry)
308 bzero(&m_sa,
sizeof(m_sa));
309 bcopy(m_hp->h_addr, (
char*)&m_sa.sin_addr, m_hp->h_length);
310 m_sa.sin_family = m_hp->h_addrtype;
311 m_sa.sin_port = htons((u_short)m_port);
316 if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
318 perror(
"RSocketRecv:socket");
321 int sizeval = D2_SOCKBUF_SIZE;
322 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
323 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
325 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
334 printf(
"RSocketRecv: reconnecting (trial %d) \n", ntry - maxretry + 1);
335 int istat = connect(m_sock, (
struct sockaddr*)&m_sa,
sizeof(m_sa));
337 printf(
"RSocketRecv: reconnected\n");
341 if (maxretry == 0)
return -1;
344 printf(
"RSocketRecv: m_sock = %d reconnected.\n", m_sock);
347int RSocketRecv::get(
char* data,
int len)
350 return m_io.get(m_sock, data, len);
353int RSocketRecv::get_wordbuf(
int* data,
int len)
359 return m_io.get_wordbuf(m_sock, data, len);
364int RSocketRecv::read(
char* data,
int len)
367 return m_io.read_data(m_sock, data, len);
370int RSocketRecv::put(
char* data,
int len)
374 return m_io.put(m_sock, data, len);
377int RSocketRecv::write(
char* data,
int len)
380 return m_io.write_data(m_sock, data, len);
383char* RSocketRecv::node()
388int RSocketRecv::port()
393int RSocketRecv::sock()
398void RSocketRecv::sock(
int sockid)
403int RSocketRecv::err()
Abstract base class for different kinds of events.