Belle II Software development
storagein.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <unistd.h>
10#include <cstdlib>
11#include <iostream>
12
13#include <framework/logging/Logger.h>
14
15#include <daq/storage/BinData.h>
16#include <daq/storage/SharedEventBuffer.h>
17
18#include <daq/slc/base/IOException.h>
19#include <daq/slc/readout/RunInfoBuffer.h>
20
21#include <daq/slc/system/TCPSocket.h>
22#include <daq/slc/system/TCPSocketReader.h>
23#include <daq/slc/system/Time.h>
24#include <daq/slc/system/LogFile.h>
25
26using namespace Belle2;
27
28int main(int argc, char** argv)
29{
30 if (argc < 4) {
31 LogFile::debug("%s : bufname bufsize hostname port "
32 "[nodename, nodeid]", argv[0]);
33 return 1;
34 }
35
36 RunInfoBuffer info;
37 bool use_info = (argc > 6);
38 if (use_info) {
39 info.open(argv[5], atoi(argv[6]));
40 }
42 ibuf.open(argv[1], atoi(argv[2]) * 1000000);//, true);
43 info.reportReady();
44 TCPSocket socket(argv[3], atoi(argv[4]));
45 info.reportReady();
46 int* evtbuf = new int[10000000];
47 BinData data;
48 data.setBuffer(evtbuf);
49 Time t0;
50 int expno = 0;
51 int runno = 0;
52 int subno = 0;
53 int ntried = 0;
54 while (true) {
55 while (socket.get_fd() <= 0) {
56 try {
57 socket.connect();
58 B2INFO("Connected to data source");
59 socket.setBufferSize(32 * 1024 * 1024);
60 ntried = 0;
61 if (info.isAvailable()) {
62 info.setInputPort(socket.getLocalPort());
63 info.setInputAddress(socket.getLocalAddress());
64 }
65 break;
66 } catch (const IOException& e) {
67 socket.close();
68 if (info.isAvailable()) {
69 info.setInputPort(0);
70 info.setInputAddress(0);
71 }
72 if (ntried < 5)
73 B2WARNING("failed to connect to eb2 (try=" << ntried++ << ")");
74 sleep(5);
75 }
76 }
77 info.reportRunning();
78 try {
79 TCPSocketReader reader(socket);
80 B2INFO("storagein: Cconnected to eb2.");
81 int count = 0;
82 while (true) {
83 reader.read(data.getBuffer(), sizeof(int));
84 unsigned int nbyte = data.getByteSize() - sizeof(int);
85 int nword = data.getWordSize();
86 reader.read((data.getBuffer() + 1), nbyte);
87 nbyte += sizeof(int);
88 if (info.isAvailable()) {
89 info.addInputCount(1);
90 info.addInputNBytes(nbyte);
91 }
92 if (expno > data.getExpNumber() || runno > data.getRunNumber()) {
93 /*
94 B2WARNING("storagein: old run event detected : exp="
95 << data.getExpNumber() << " runno="
96 << data.getRunNumber() << " current = ("
97 << expno << "," << runno << ")");
98 */
99 continue;
100 } else if (expno < data.getExpNumber() || runno < data.getRunNumber()) {
101 expno = data.getExpNumber();
102 runno = data.getRunNumber();
103 B2INFO("new run detected : exp=" << expno << " runno=" << runno);
104 SharedEventBuffer::Header* iheader = ibuf.getHeader();
105 iheader->expno = expno;
106 iheader->runno = runno;
107 iheader->subno = subno;
108 if (info.isAvailable()) {
109 info.setExpNumber(expno);
110 info.setRunNumber(runno);
111 info.setSubNumber(subno);
112 info.setInputCount(0);
113 info.setInputNBytes(0);
114 info.setOutputCount(0);
115 info.setOutputNBytes(0);
116 }
117 count = 0;
118 }
119 if (count < 1000000 && (count < 10 ||
120 (count > 10 && count < 100 && count % 10 == 0) ||
121 (count > 100 && count < 1000 && count % 100 == 0) ||
122 (count > 1000 && count < 10000 && count % 1000 == 0) ||
123 (count > 10000 && count < 100000 && count % 10000 == 0) ||
124 (count > 100000 && count % 100000 == 0))) {
125 std::cout << "[DEBUG] Event count = " << count << " nword = " << nword << std::endl;
126 }
127 count++;
128 ibuf.write(data.getBuffer(), nword, true);
129 if (info.isAvailable()) {
130 info.addOutputCount(1);
131 info.addOutputNBytes(nword * sizeof(int));
132 }
133 }
134 } catch (const IOException& e) {
135 socket.close();
136 if (info.isAvailable()) info.setInputPort(0);
137 B2WARNING("Connection to eb2 broken.");
138 sleep(5);
139 }
140 }
141 return 0;
142}
143
Abstract base class for different kinds of events.