Belle II Software development
file2sockr.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <string>
9#include <vector>
10
11#include <stdio.h>
12#include <stdlib.h>
13#include <unistd.h>
14#include <sys/time.h>
15
16#include "framework/pcore/SeqFile.h"
17#include "daq/dataflow/REvtSocket.h"
18
19#include "TRandom.h"
20
21#define EVENTINTERVAL 5000
22
23using namespace Belle2;
24using namespace std;
25
26int main(int argc, char** argv)
27{
28 if (argc < 3) {
29 printf("file2sock : filename port poisson_freq file_interval\n");
30 exit(-1);
31 }
32
33 // Poisson random number generate
34 TRandom rand;
35
36 string filename(argv[1]);
37 int port = atoi(argv[2]);
38 int pfreq = atoi(argv[3]);
39
40 double minterval = 0.0;
41 if (pfreq != 0)
42 minterval = 1.0E6 / (double)pfreq;
43
44 vector<string> filelist;
45 // Check file
46 if ((int)filename.rfind(".list") != -1) {
47 FILE* fd = fopen(filename.c_str(), "r");
48 for (;;) {
49 char listfile[1024];
50 int is = fscanf(fd, "%s", listfile);
51 if (is <= 0) break;
52 filelist.push_back(string(listfile));
53 }
54 } else {
55 filelist.push_back(filename);
56 printf("File %s is put ln the list\n", filename.c_str());
57 }
58 printf("# of input files = %d\n", (int)filelist.size());
59 for (int i = 0; i < (int)filelist.size(); i++) {
60 printf("file = %s\n", filelist[i].c_str());
61 }
62
63 int fileptr = 0;
64
65 // Open EventSocket
66 REvtSocketSend* sock = new REvtSocketSend(port);
67
68 // Event Buffer
69 char* evbuf = new char[MAXEVTSIZE];
70
71 // Open 1st file
72 SeqFile* file = new SeqFile(filelist[fileptr++].c_str(), "r");
73 if (file->status() <= 0) {
74 perror("file open");
75 exit(-1);
76 }
77 // Skip the first record (StreamerInfo)
78 int is = file->read(evbuf, MAXEVTSIZE);
79 if (is <= 0) {
80 printf("Error in reading file : %d\n", is);
81 exit(-1);
82 }
83
84 // Event / time counter
85 int nevent = 0;
86 struct timeval tnow;
87 struct timeval tprev;
88 gettimeofday(&tnow, NULL);
89 gettimeofday(&tprev, NULL);
90
91 double datasize = 0.0;
92 printf("Start event loop\n");
93 // Loop for event records
94 for (;;) {
95 int isN = file->read(evbuf, MAXEVTSIZE);
96 if (isN < 0) {
97 printf("Error in reading file : %d\n", isN);
98 break;
99 } else if (isN > MAXEVTSIZE) {
100 printf("Event size too large : %d\n", isN);
101 continue;
102 } else if (isN == 0) {
103 delete file;
104 if (fileptr == (int)filelist.size()) {
105 printf("End of file list reached. Exitting\n");
106 break;
107 }
108 file = new SeqFile(filelist[fileptr++].c_str(), "r");
109 if (file->status() <= 0) {
110 perror("file open");
111 exit(-1);
112 }
113 // Skip the first record (StreamerInfo)
114 int isNow = file->read(evbuf, MAXEVTSIZE);
115 if (isNow <= 0) {
116 printf("Error in reading file : %d\n", isNow);
117 exit(-1);
118 }
119 // Read next record (Event)
120 isNow = file->read(evbuf, MAXEVTSIZE);
121 if (isNow < 0) {
122 printf("Error in reading file : %d\n", isNow);
123 exit(-1);
124 }
125 // Wait for 5 sec so that processing of previous file is completed.
126 sleep(10);
127 }
128
129 // Put the message to Socket
130 EvtMessage* msg = new EvtMessage(evbuf); // Ptr copy, no overhead
131
132 if (msg->type() == MSG_TERMINATE) {
133 printf("EoF found. Exitting.....\n");
134 sock->send(msg);
135 delete msg;
136 return -1;
137 } else if (msg->type() == MSG_STREAMERINFO) {
138 printf("StreamerInfo. Skipped....\n");
139 continue;
140 } else {
141 int isNow = sock->send(msg);
142 delete msg;
143 if (isNow <= 0) {
144 printf("Cannot send event. Exitting\n");
145 return -1;
146 }
147 // return msg->size();
148 }
149 if (minterval != 0)
150 usleep(rand.Poisson(minterval));
151
152 nevent++;
153 datasize += (double)isN;
154
155 if (nevent % EVENTINTERVAL == 0) {
156 gettimeofday(&tnow, NULL);
157 double delta = (double)((tnow.tv_sec - tprev.tv_sec) * 1000000 +
158 (tnow.tv_usec - tprev.tv_usec));
159 double rate = ((double)EVENTINTERVAL) / delta * 1.0E6;
160 double flow = datasize / delta;
161 printf("Event = %8d; Ave. rate = %7.2f Hz, flow = %7.2f MB/s\n",
162 nevent, rate, flow);
163 tprev = tnow;
164 datasize = 0.0;
165 }
166 }
167}
168
169
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
A class to manage I/O for a chain of blocked files.
Definition: SeqFile.h:22
Abstract base class for different kinds of events.
STL namespace.