Belle II Software  release-08-01-10
file2sockr.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <string>
9 #include <vector>
10 
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <unistd.h>
14 #include <sys/time.h>
15 
16 #include "framework/pcore/SeqFile.h"
17 #include "daq/dataflow/REvtSocket.h"
18 
19 #include "TRandom.h"
20 
21 #define EVENTINTERVAL 5000
22 
23 using namespace Belle2;
24 using namespace std;
25 
26 int main(int argc, char** argv)
27 {
28  if (argc < 3) {
29  printf("file2sock : filename port poisson_freq file_interval\n");
30  exit(-1);
31  }
32 
33  // Poisson random number generate
34  TRandom rand;
35 
36  string filename(argv[1]);
37  int port = atoi(argv[2]);
38  int pfreq = atoi(argv[3]);
39 
40  double minterval = 0.0;
41  if (pfreq != 0)
42  minterval = 1.0E6 / (double)pfreq;
43 
44  vector<string> filelist;
45  // Check file
46  if ((int)filename.rfind(".list") != -1) {
47  FILE* fd = fopen(filename.c_str(), "r");
48  for (;;) {
49  char listfile[1024];
50  int is = fscanf(fd, "%s", listfile);
51  if (is <= 0) break;
52  filelist.push_back(string(listfile));
53  }
54  } else {
55  filelist.push_back(filename);
56  printf("File %s is put ln the list\n", filename.c_str());
57  }
58  printf("# of input files = %d\n", (int)filelist.size());
59  for (int i = 0; i < (int)filelist.size(); i++) {
60  printf("file = %s\n", filelist[i].c_str());
61  }
62 
63  int fileptr = 0;
64 
65  // Open EventSocket
66  REvtSocketSend* sock = new REvtSocketSend(port);
67 
68  // Event Buffer
69  char* evbuf = new char[MAXEVTSIZE];
70 
71  // Open 1st file
72  SeqFile* file = new SeqFile(filelist[fileptr++].c_str(), "r");
73  if (file->status() <= 0) {
74  perror("file open");
75  exit(-1);
76  }
77  // Skip the first record (StreamerInfo)
78  int is = file->read(evbuf, MAXEVTSIZE);
79  if (is <= 0) {
80  printf("Error in reading file : %d\n", is);
81  exit(-1);
82  }
83 
84  // Event / time counter
85  int nevent = 0;
86  struct timeval tnow;
87  struct timeval tprev;
88  gettimeofday(&tnow, NULL);
89  gettimeofday(&tprev, NULL);
90 
91  double datasize = 0.0;
92  printf("Start event loop\n");
93  // Loop for event records
94  for (;;) {
95  int isN = file->read(evbuf, MAXEVTSIZE);
96  if (isN < 0) {
97  printf("Error in reading file : %d\n", isN);
98  break;
99  } else if (isN > MAXEVTSIZE) {
100  printf("Event size too large : %d\n", isN);
101  continue;
102  } else if (isN == 0) {
103  delete file;
104  if (fileptr == (int)filelist.size()) {
105  printf("End of file list reached. Exitting\n");
106  break;
107  }
108  file = new SeqFile(filelist[fileptr++].c_str(), "r");
109  if (file->status() <= 0) {
110  perror("file open");
111  exit(-1);
112  }
113  // Skip the first record (StreamerInfo)
114  int isNow = file->read(evbuf, MAXEVTSIZE);
115  if (isNow <= 0) {
116  printf("Error in reading file : %d\n", isNow);
117  exit(-1);
118  }
119  // Read next record (Event)
120  isNow = file->read(evbuf, MAXEVTSIZE);
121  if (isNow < 0) {
122  printf("Error in reading file : %d\n", isNow);
123  exit(-1);
124  }
125  // Wait for 5 sec so that processing of previous file is completed.
126  sleep(10);
127  }
128 
129  // Put the message to Socket
130  EvtMessage* msg = new EvtMessage(evbuf); // Ptr copy, no overhead
131 
132  if (msg->type() == MSG_TERMINATE) {
133  printf("EoF found. Exitting.....\n");
134  sock->send(msg);
135  delete msg;
136  return -1;
137  } else if (msg->type() == MSG_STREAMERINFO) {
138  printf("StreamerInfo. Skipped....\n");
139  continue;
140  } else {
141  int isNow = sock->send(msg);
142  delete msg;
143  if (isNow <= 0) {
144  printf("Cannot send event. Exitting\n");
145  return -1;
146  }
147  // return msg->size();
148  }
149  if (minterval != 0)
150  usleep(rand.Poisson(minterval));
151 
152  nevent++;
153  datasize += (double)isN;
154 
155  if (nevent % EVENTINTERVAL == 0) {
156  gettimeofday(&tnow, NULL);
157  double delta = (double)((tnow.tv_sec - tprev.tv_sec) * 1000000 +
158  (tnow.tv_usec - tprev.tv_usec));
159  double rate = ((double)EVENTINTERVAL) / delta * 1.0E6;
160  double flow = datasize / delta;
161  printf("Event = %8d; Ave. rate = %7.2f Hz, flow = %7.2f MB/s\n",
162  nevent, rate, flow);
163  tprev = tnow;
164  datasize = 0.0;
165  }
166  }
167 }
168 
169 
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
A class to manage I/O for a chain of blocked files.
Definition: SeqFile.h:22
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91