14#include <sys/socket.h>
15#include <netinet/in.h>
20#include "daq/dataflow/SocketLib.h"
22#define MAXHOSTNAME 100
36int SocketIO::put(
int sock,
char* data,
int len)
38 int to_size = htonl(len);
40 int br = write_data(sock, (
char*)&to_size, 4);
42 perror(
"put: sending size");
46 int bcount = write_data(sock, data, len);
47 if (bcount < 0) perror(
"put: sending data");
51int SocketIO::put_wordbuf(
int sock,
int* data,
int len)
54 int bcount = write_data(sock, (
char*)data, len *
sizeof(
int));
55 if (bcount < 0) perror(
"put: sending data");
59 return ((bcount - 1) / 4 + 1);
62int SocketIO::write_data(
int sock,
char* data,
int len)
70 while (bcount < len) {
72 if ((br =::write(sock, ptr, len - bcount)) > 0) {
95int SocketIO::get(
int sock,
char* data,
int len)
98 int br = read_data(sock, (
char*)&gcount, 4);
99 if (br <= 0)
return br;
100 gcount = ntohl(gcount);
102 printf(
"buffer too small : %d(%d)", gcount, len);
105 int bcount = read_data(sock, data, gcount);
109int SocketIO::get_wordbuf(
int sock,
int* wrdbuf,
int len)
112 int br = read_data(sock, (
char*)wrdbuf,
sizeof(
int));
113 if (br <= 0)
return br;
118 printf(
"buffer too small : %d(%d)", gcount, len);
121 read_data(sock, (
char*)&wrdbuf[1], (gcount - 1) *
sizeof(
int));
128#define MAX_PXD_FRAMES 256
129 const int headerlen = 8;
130 int* pxdheader = (
int*) data;
131 int* pxdheadertable = (
int*) &data[headerlen];
132 int framenr = 0, tablelen = 0, datalen = 0;
133 int br = read_data(sock, data, headerlen);
134 if (br <= 0)
return br;
135 if (
static_cast<uint
>(pxdheader[0]) != htonl(0xCAFEBABE)) {
136 printf(
"pxdheader wrong : Magic %X , Frames %X \n", pxdheader[0], ntohl(pxdheader[1]));
139 pxdheader[0] = ntohl(pxdheader[0]);
140 framenr = pxdheader[1] = ntohl(pxdheader[1]);
141 if (framenr > MAX_PXD_FRAMES) {
142 printf(
"MAX_PXD_FRAMES too small : %d(%d) \n", framenr, MAX_PXD_FRAMES);
145 tablelen = 4 * framenr;
146 br = read_data(sock, (
char*)&data[headerlen], tablelen);
147 if (br <= 0)
return br;
148 for (
int i = 0; i < framenr; i++) {
149 pxdheadertable[i] = ntohl(pxdheadertable[i]);
150 datalen += (pxdheadertable[i] + 3) & 0xFFFFFFFC;
153 if (datalen + headerlen + tablelen > len) {
154 printf(
"buffer too small : %d %d %d(%d) \n", headerlen, tablelen, datalen, len);
157 int bcount = read_data(sock, data + headerlen + tablelen, datalen);
158 return (headerlen + tablelen + bcount);
161int SocketIO::read_data(
int sock,
char* data,
int len)
166 while (bcount < len) {
168 if ((br =::read(sock, buf, len - bcount)) > 0) {
173 if (br == 0)
return 0;
178 printf(
"read: interrupted!\n");
187 perror(
"SocketIO:read");
188 fprintf(stderr,
"sock = %d, buf=%p, len = %d\n", sock, buf, len - bcount);
197void SocketIO::interrupt(
void)
204SocketRecv::SocketRecv(u_short p)
207 struct sockaddr_in sa;
208 bzero(&sa,
sizeof(
struct sockaddr_in));
212 sa.sin_family = AF_INET;
213 sa.sin_port = htons(p);
216 if ((s = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
218 perror(
"SocketRecv::socket");
223 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &optval, 4);
225 int sizeval = D2_SOCKBUF_SIZE;
226 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
227 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
229 signal(SIGPIPE, SIG_IGN);
233 if ((bind(s, (
struct sockaddr*)&sa,
sizeof(sa))) < 0) {
235 perror(
"SocketRecv::bind");
242 printf(
"SocketRecv:: initialized, sock=%d\n", m_sock);
248SocketRecv::~SocketRecv()
251 shutdown(m_sender, 2);
253 printf(
"SocketRecv:: receiving socket %d closed\n", m_sender);
258 printf(
"SocketRecv:: connection socket %d closed\n", m_sock);
261int SocketRecv::accept()
264 struct sockaddr_in isa;
265 socklen_t i =
sizeof(isa);
266 getsockname(m_sock, (
struct sockaddr*)&isa, &i);
269 if ((t =::accept(m_sock, (
struct sockaddr*)&isa, &i)) < 0) {
274 printf(
"SocketRecv:: connection request accepted, sender=%d\n", t);
280int SocketRecv::close()
284 printf(
"SocketRecv: destructed, m_sender = %d\n", m_sender);
288int SocketRecv::examine()
295 FD_SET(m_sock, &ready);
299 if (select(FD_SETSIZE, &ready, 0, 0, &to) < 0) {
304 if (FD_ISSET(m_sock, &ready)) {
305 printf(
"SocketRecv::connected!!!!\n");
311int SocketRecv::get(
char* data,
int len)
317 return m_io.get(m_sender, data, len);
322int SocketRecv::get_wordbuf(
int* data,
int len)
328 return m_io.get_wordbuf(m_sender, data, len);
333int SocketRecv::read(
char* data,
int len)
337 return m_io.read_data(m_sender, data, len);
342int SocketRecv::put(
char* data,
int len)
347 return m_io.put(m_sender, data, len);
352int SocketRecv::write(
char* data,
int len)
356 return m_io.write_data(m_sender, data, len);
362int SocketRecv::sock()
const
367void SocketRecv::sock(
int sockid)
372int SocketRecv::sender()
const
377int SocketRecv::err()
const
382void SocketRecv::interrupt()
390SocketSend::SocketSend(
const char* node, u_short port)
394 if ((m_hp = gethostbyname(node)) == NULL) {
397 "SocketSend::gethostbyname(%s): not found\n", node);
401 struct sockaddr_in sa;
402 bzero(&sa,
sizeof(sa));
403 bcopy(m_hp->h_addr, (
char*)&sa.sin_addr, m_hp->h_length);
404 sa.sin_family = m_hp->h_addrtype;
405 sa.sin_port = htons((u_short)port);
409 if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
411 perror(
"SocketSend:socket");
414 int sizeval = D2_SOCKBUF_SIZE;
415 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
416 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
418 signal(SIGPIPE, SIG_IGN);
424 if (connect(s, (
struct sockaddr*)&sa,
sizeof(sa)) < 0) {
426 perror(
"SocketSend:connect");
427 printf(
"connection error..... m_sock set to %d\n", m_sock);
428 if (m_errno == ETIMEDOUT) {
429 printf(
".... try again after 5 sec. \n");
432 if (maxretry == 0) exit(1);
440 strcpy(m_node, node);
441 signal(SIGPIPE, SIG_IGN);
442 printf(
"SocketSend: initialized, m_sock = %d\n", s);
445SocketSend::~SocketSend()
449 printf(
"SocketSend: destructed, m_sock = %d\n", m_sock);
452int SocketSend::reconnect(
int ntry)
459 bzero(&m_sa,
sizeof(m_sa));
460 bcopy(m_hp->h_addr, (
char*)&m_sa.sin_addr, m_hp->h_length);
461 m_sa.sin_family = m_hp->h_addrtype;
462 m_sa.sin_port = htons((u_short)m_port);
467 if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
469 perror(
"RSocketRecv:socket");
472 int sizeval = D2_SOCKBUF_SIZE;
473 setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
474 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
476 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
485 printf(
"SocketSend: reconnecting (trial %d) \n", ntry - maxretry + 1);
486 int istat = connect(m_sock, (
struct sockaddr*)&m_sa,
sizeof(m_sa));
488 printf(
"SocketSend: reconnected\n");
492 if (maxretry == 0)
return -1;
495 printf(
"SocketSend: m_sock = %d reconnected.\n", m_sock);
498int SocketSend::get(
char* data,
int len)
501 return m_io.get(m_sock, data, len);
504int SocketSend::get_pxd(
char* data,
int len)
507 return m_io.
get_pxd(m_sock, data, len);
510int SocketSend::read(
char* data,
int len)
513 return m_io.read_data(m_sock, data, len);
516int SocketSend::put(
char* data,
int len)
520 return m_io.put(m_sock, data, len);
523int SocketSend::put_wordbuf(
int* data,
int len)
527 return m_io.put_wordbuf(m_sock, data, len);
530int SocketSend::write(
char* data,
int len)
533 return m_io.write_data(m_sock, data, len);
536char* SocketSend::node()
541int SocketSend::port()
546int SocketSend::sock()
551void SocketSend::sock(
int sockid)
int get_pxd(int sock, char *data, int len)
Abstract base class for different kinds of events.