Belle II Software development
b2hlt_file2rb.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/rfarm/manager/RFFlowStat.h>
9#include <framework/pcore/RingBuffer.h>
10#include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
11#include <daq/rfarm/event/hltsocket/HLTFile.h>
12#include <framework/logging/Logger.h>
13#include <framework/pcore/EvtMessage.h>
14
15#include <boost/program_options.hpp>
16#include <iostream>
17
18#define MAXEVTSIZE 80000000
19
20using namespace Belle2;
21namespace po = boost::program_options;
22
23int main(int argc, char* argv[])
24{
25 std::string ringBufferName;
26 std::string shmName;
27 unsigned int shmID;
28 bool raw = false;
29 bool repeat = false;
30 std::string file_name;
31
32 po::options_description desc("b2hlt_file2rb FILE-NAME RING-BUFFER-NAME SHM-NAME SHM-ID");
33 desc.add_options()
34 ("help,h", "Print this help message")
35 ("ring-buffer-name,r", po::value<std::string>(&ringBufferName)->required(), "name of the ring buffer")
36 ("file-name,f", po::value<std::string>(&file_name)->required(), "file name to write to")
37 ("shm-name,n", po::value<std::string>(&shmName)->required(), "name of the shm for flow output")
38 ("shm-id,i", po::value<unsigned int>(&shmID)->required(), "id in the shm for flow output")
39 ("raw", po::bool_switch(&raw)->default_value(false), "send and receive raw data instead of event buffers")
40 ("repeat", po::bool_switch(&repeat)->default_value(false), "repeat after the file is finished");
41
42
43 po::positional_options_description p;
44 p.add("file-name", 1).add("ring-buffer-name", 1).add("shm-name", 1).add("shm-id", 1);
45
46 po::variables_map vm;
47 try {
48 po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
49 } catch (std::exception& e) {
50 B2FATAL(e.what());
51 }
52
53 if (vm.count("help")) {
54 std::cout << desc << std::endl;
55 return 1;
56 }
57
58 try {
59 po::notify(vm);
60 } catch (std::exception& e) {
61 B2FATAL(e.what());
62 }
63
64 // TODO: delete or not?
65 RingBuffer* ringBuffer = new RingBuffer(ringBufferName.c_str());
66 RFFlowStat flow((char*)shmName.c_str(), shmID, ringBuffer);
67 int* buffer = new int[MAXEVTSIZE];
68
69 HLTMainLoop mainLoop;
70
71 HLTFile file;
72 int size;
73 int nevt = 0;
74 bool terminate = false;
75
76 if (not file.open(file_name, raw, "r")) {
77 B2ERROR("Can not open file");
78 terminate = true;
79 }
80
81 while (mainLoop.isRunning() and not terminate) {
82 // Read from socket
83 if (raw) {
84 size = file.get_wordbuf(buffer, MAXEVTSIZE);
85 } else {
86 size = file.get(reinterpret_cast<char*>(buffer), MAXEVTSIZE);
87 }
88 // Error checking socket
89 if (size == 0) {
90 if (repeat) {
91 file.open(file_name, raw, "r");
92 continue;
93 } else {
94 B2RESULT("Reached end of file");
95 break;
96 }
97 } else if (size < 0) {
98 if (mainLoop.isRunning()) {
99 B2ERROR("Error in receiving the event! Aborting.");
100 }
101 // This is fine if we are terminating anyways
102 break;
103 }
104 B2ASSERT("Size is negative! This should be handled above. Not good!", size > 0);
105
106 // Terminate messages make us terminate
107 if (not raw) {
108 EvtMessage message(reinterpret_cast<char*>(buffer));
109 if (message.type() == MSG_TERMINATE) {
110 B2RESULT("Having received terminate message");
111 terminate = true;
112 }
113 }
114
115 // Monitoring
116 flow.log(size * sizeof(int));
117
118 // Write to ring buffer
119 const int returnValue = mainLoop.writeToRingBufferWaiting(ringBuffer, buffer, size);
120 // Error check ring buffer
121 if (returnValue <= 0) {
122 if (mainLoop.isRunning()) {
123 B2ERROR("Writing to the ring buffer failed!");
124 }
125 // This is fine if we are terminating anyways
126 break;
127 }
128 B2ASSERT("Written size is negative! This should be handled above. Not good!", returnValue > 0);
129
130 // Logging
131 nevt++;
132 if (nevt % 5000 == 0) {
133 B2RESULT("b2hlt_file2rb event number: " << nevt);
134 }
135 }
136
137 B2RESULT("Program terminated.");
138}
Class to manage streamed object.
Definition: EvtMessage.h:59
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
Abstract base class for different kinds of events.