14 #include <sys/socket.h>
15 #include <netinet/in.h>
20 #include "daq/dataflow/SocketLib.h"
22 #define MAXHOSTNAME 100
36 int SocketIO::put(
int sock,
char* data,
int len)
38 int to_size = htonl(len);
40 int br = write_data(sock, (
char*)&to_size, 4);
42 perror(
"put: sending size");
46 int bcount = write_data(sock, data, len);
47 if (bcount < 0) perror(
"put: sending data");
51 int SocketIO::put_wordbuf(
int sock,
int* data,
int len)
54 int bcount = write_data(sock, (
char*)data, len *
sizeof(
int));
55 if (bcount < 0) perror(
"put: sending data");
59 return ((bcount - 1) / 4 + 1);
62 int SocketIO::write_data(
int sock,
char* data,
int len)
70 while (bcount < len) {
72 if ((br =::write(sock, ptr, len - bcount)) > 0) {
95 int SocketIO::get(
int sock,
char* data,
int len)
98 int br = read_data(sock, (
char*)&gcount, 4);
99 if (br <= 0)
return br;
100 gcount = ntohl(gcount);
102 printf(
"buffer too small : %d(%d)", gcount, len);
105 int bcount = read_data(sock, data, gcount);
109 int SocketIO::get_wordbuf(
int sock,
int* wrdbuf,
int len)
112 int br = read_data(sock, (
char*)wrdbuf,
sizeof(
int));
113 if (br <= 0)
return br;
118 printf(
"buffer too small : %d(%d)", gcount, len);
121 read_data(sock, (
char*)&wrdbuf[1], (gcount - 1) *
sizeof(
int));
128 #define MAX_PXD_FRAMES 256
129 const int headerlen = 8;
130 int* pxdheader = (
int*) data;
131 int* pxdheadertable = (
int*) &data[headerlen];
132 int framenr = 0, tablelen = 0, datalen = 0;
133 int br = read_data(sock, data, headerlen);
134 if (br <= 0)
return br;
135 if (pxdheader[0] != htonl(0xCAFEBABE)) {
136 printf(
"pxdheader wrong : Magic %X , Frames %X \n", pxdheader[0], ntohl(pxdheader[1]));
139 pxdheader[0] = ntohl(pxdheader[0]);
140 framenr = pxdheader[1] = ntohl(pxdheader[1]);
141 if (framenr > MAX_PXD_FRAMES) {
142 printf(
"MAX_PXD_FRAMES too small : %d(%d) \n", framenr, MAX_PXD_FRAMES);
145 tablelen = 4 * framenr;
146 br = read_data(sock, (
char*)&data[headerlen], tablelen);
147 if (br <= 0)
return br;
148 for (
int i = 0; i < framenr; i++) {
149 pxdheadertable[i] = ntohl(pxdheadertable[i]);
150 datalen += (pxdheadertable[i] + 3) & 0xFFFFFFFC;
153 if (datalen + headerlen + tablelen > len) {
154 printf(
"buffer too small : %d %d %d(%d) \n", headerlen, tablelen, datalen, len);
157 int bcount = read_data(sock, data + headerlen + tablelen, datalen);
158 if (br <= 0)
return br;
159 return (headerlen + tablelen + bcount);
162 int SocketIO::read_data(
int sock,
char* data,
int len)
167 while (bcount < len) {
169 if ((br =::read(sock, buf, len - bcount)) > 0) {
174 if (br == 0)
return 0;
179 printf(
"read: interrupted!\n");
188 perror(
"SocketIO:read");
189 fprintf(stderr,
"sock = %d, buf=%p, len = %d\n", sock, buf, len - bcount);
198 void SocketIO::interrupt(
void)
205 SocketRecv::SocketRecv(u_short p)
208 struct sockaddr_in sa;
209 bzero(&sa,
sizeof(
struct sockaddr_in));
213 sa.sin_family = AF_INET;
214 sa.sin_port = htons(p);
217 if ((s = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
219 perror(
"SocketRecv::socket");
224 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &optval, 4);
226 int sizeval = D2_SOCKBUF_SIZE;
227 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
228 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
230 signal(SIGPIPE, SIG_IGN);
234 if ((bind(s, (
struct sockaddr*)&sa,
sizeof(sa))) < 0) {
236 perror(
"SocketRecv::bind");
243 printf(
"SocketRecv:: initialized, sock=%d\n", m_sock);
249 SocketRecv::~SocketRecv()
252 shutdown(m_sender, 2);
254 printf(
"SocketRecv:: receiving socket %d closed\n", m_sender);
259 printf(
"SocketRecv:: connection socket %d closed\n", m_sock);
262 int SocketRecv::reconnect(
int ntry)
269 bzero(&m_sa,
sizeof(m_sa));
270 bcopy(m_hp->h_addr, (
char*)&m_sa.sin_addr, m_hp->h_length);
271 m_sa.sin_family = m_hp->h_addrtype;
272 m_sa.sin_port = htons((u_short)m_port);
277 if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
279 perror(
"RSocketRecv:socket");
282 int sizeval = D2_SOCKBUF_SIZE;
283 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
284 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
286 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
295 printf(
"RSocketRecv: reconnecting (trial %d) \n", ntry - maxretry + 1);
296 int istat = connect(m_sock, (
struct sockaddr*)&m_sa,
sizeof(m_sa));
298 printf(
"RSocketRecv: reconnected\n");
302 if (maxretry == 0)
return -1;
305 printf(
"RSocketRecv: m_sock = %d reconnected.\n", m_sock);
308 int SocketRecv::accept()
311 struct sockaddr_in isa;
312 socklen_t i =
sizeof(isa);
313 getsockname(m_sock, (
struct sockaddr*)&isa, &i);
316 if ((t =::accept(m_sock, (
struct sockaddr*)&isa, &i)) < 0) {
321 printf(
"SocketRecv:: connection request accepted, sender=%d\n", t);
327 int SocketRecv::close()
331 printf(
"SocketRecv: destructed, m_sender = %d\n", m_sender);
335 int SocketRecv::examine()
342 FD_SET(m_sock, &ready);
346 if (select(FD_SETSIZE, &ready, 0, 0, &to) < 0) {
351 if (FD_ISSET(m_sock, &ready)) {
352 printf(
"SocketRecv::connected!!!!\n");
358 int SocketRecv::get(
char* data,
int len)
364 return m_io.get(m_sender, data, len);
369 int SocketRecv::get_wordbuf(
int* data,
int len)
375 return m_io.get_wordbuf(m_sender, data, len);
380 int SocketRecv::read(
char* data,
int len)
384 return m_io.read_data(m_sender, data, len);
389 int SocketRecv::put(
char* data,
int len)
394 return m_io.put(m_sender, data, len);
399 int SocketRecv::write(
char* data,
int len)
403 return m_io.write_data(m_sender, data, len);
409 int SocketRecv::sock()
const
414 void SocketRecv::sock(
int sockid)
419 int SocketRecv::sender()
const
424 int SocketRecv::err()
const
429 void SocketRecv::interrupt()
437 SocketSend::SocketSend(
const char* node, u_short port)
441 if ((m_hp = gethostbyname(node)) == NULL) {
444 "SocketSend::gethostbyname(%s): not found\n", node);
448 struct sockaddr_in sa;
449 bzero(&sa,
sizeof(sa));
450 bcopy(m_hp->h_addr, (
char*)&sa.sin_addr, m_hp->h_length);
451 sa.sin_family = m_hp->h_addrtype;
452 sa.sin_port = htons((u_short)port);
456 if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
458 perror(
"SocketSend:socket");
461 int sizeval = D2_SOCKBUF_SIZE;
462 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
463 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
465 signal(SIGPIPE, SIG_IGN);
471 if (connect(s, (
struct sockaddr*)&sa,
sizeof(sa)) < 0) {
473 perror(
"SocketSend:connect");
474 printf(
"connection error..... m_sock set to %d\n", m_sock);
475 if (m_errno == ETIMEDOUT) {
476 printf(
".... try again after 5 sec. \n");
479 if (maxretry == 0) exit(1);
487 strcpy(m_node, node);
488 signal(SIGPIPE, SIG_IGN);
489 printf(
"SocketSend: initialized, m_sock = %d\n", s);
492 SocketSend::~SocketSend()
496 printf(
"SocketSend: destructed, m_sock = %d\n", m_sock);
499 int SocketSend::reconnect(
int ntry)
506 bzero(&m_sa,
sizeof(m_sa));
507 bcopy(m_hp->h_addr, (
char*)&m_sa.sin_addr, m_hp->h_length);
508 m_sa.sin_family = m_hp->h_addrtype;
509 m_sa.sin_port = htons((u_short)m_port);
514 if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
516 perror(
"RSocketRecv:socket");
519 int sizeval = D2_SOCKBUF_SIZE;
520 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
521 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
523 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
532 printf(
"SocketSend: reconnecting (trial %d) \n", ntry - maxretry + 1);
533 int istat = connect(m_sock, (
struct sockaddr*)&m_sa,
sizeof(m_sa));
535 printf(
"SocketSend: reconnected\n");
539 if (maxretry == 0)
return -1;
542 printf(
"SocketSend: m_sock = %d reconnected.\n", m_sock);
545 int SocketSend::get(
char* data,
int len)
548 return m_io.get(m_sock, data, len);
551 int SocketSend::get_pxd(
char* data,
int len)
554 return m_io.
get_pxd(m_sock, data, len);
557 int SocketSend::read(
char* data,
int len)
560 return m_io.read_data(m_sock, data, len);
563 int SocketSend::put(
char* data,
int len)
567 return m_io.put(m_sock, data, len);
570 int SocketSend::put_wordbuf(
int* data,
int len)
574 return m_io.put_wordbuf(m_sock, data, len);
577 int SocketSend::write(
char* data,
int len)
580 return m_io.write_data(m_sock, data, len);
583 char* SocketSend::node()
588 int SocketSend::port()
593 int SocketSend::sock()
598 void SocketSend::sock(
int sockid)
603 int SocketSend::err()
int get_pxd(int sock, char *data, int len)
Abstract base class for different kinds of events.