Belle II Software  release-05-01-25
b2hlt_file2rb.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Ryosuke Itoh, Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <daq/rfarm/manager/RFFlowStat.h>
11 #include <framework/pcore/RingBuffer.h>
12 #include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
13 #include <daq/rfarm/event/hltsocket/HLTFile.h>
14 #include <framework/logging/Logger.h>
15 #include <framework/pcore/EvtMessage.h>
16 
17 #include <boost/program_options.hpp>
18 #include <iostream>
19 
20 #define MAXEVTSIZE 80000000
21 
22 using namespace Belle2;
23 namespace po = boost::program_options;
24 
25 int main(int argc, char* argv[])
26 {
27  std::string ringBufferName;
28  std::string shmName;
29  unsigned int shmID;
30  bool raw;
31  bool repeat;
32  std::string file_name;
33 
34  po::options_description desc("b2hlt_file2rb FILE-NAME RING-BUFFER-NAME SHM-NAME SHM-ID");
35  desc.add_options()
36  ("help,h", "Print this help message")
37  ("ring-buffer-name,r", po::value<std::string>(&ringBufferName)->required(), "name of the ring buffer")
38  ("file-name,f", po::value<std::string>(&file_name)->required(), "file name to write to")
39  ("shm-name,n", po::value<std::string>(&shmName)->required(), "name of the shm for flow output")
40  ("shm-id,i", po::value<unsigned int>(&shmID)->required(), "id in the shm for flow output")
41  ("raw", po::bool_switch(&raw)->default_value(false), "send and receive raw data instead of event buffers")
42  ("repeat", po::bool_switch(&repeat)->default_value(false), "repeat after the file is finished");
43 
44 
45  po::positional_options_description p;
46  p.add("file-name", 1).add("ring-buffer-name", 1).add("shm-name", 1).add("shm-id", 1);
47 
48  po::variables_map vm;
49  try {
50  po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
51  } catch (std::exception& e) {
52  B2FATAL(e.what());
53  }
54 
55  if (vm.count("help")) {
56  std::cout << desc << std::endl;
57  return 1;
58  }
59 
60  try {
61  po::notify(vm);
62  } catch (std::exception& e) {
63  B2FATAL(e.what());
64  }
65 
66  // TODO: delete or not?
67  RingBuffer* ringBuffer = new RingBuffer(ringBufferName.c_str());
68  RFFlowStat flow((char*)shmName.c_str(), shmID, ringBuffer);
69  int* buffer = new int[MAXEVTSIZE];
70 
71  HLTMainLoop mainLoop;
72 
73  HLTFile file;
74  int size;
75  int nevt = 0;
76  bool terminate = false;
77 
78  if (not file.open(file_name, raw, "r")) {
79  B2ERROR("Can not open file");
80  terminate = true;
81  }
82 
83  while (mainLoop.isRunning() and not terminate) {
84  // Read from socket
85  if (raw) {
86  size = file.get_wordbuf(buffer, MAXEVTSIZE);
87  } else {
88  size = file.get(reinterpret_cast<char*>(buffer), MAXEVTSIZE);
89  }
90  // Error checking socket
91  if (size == 0) {
92  if (repeat) {
93  file.open(file_name, raw, "r");
94  continue;
95  } else {
96  B2RESULT("Reached end of file");
97  break;
98  }
99  } else if (size < 0) {
100  if (mainLoop.isRunning()) {
101  B2ERROR("Error in receiving the event! Aborting.");
102  }
103  // This is fine if we are terminating anyways
104  break;
105  }
106  B2ASSERT("Size is negative! This should be handled above. Not good!", size > 0);
107 
108  // Terminate messages make us terminate
109  if (not raw) {
110  EvtMessage message(reinterpret_cast<char*>(buffer));
111  if (message.type() == MSG_TERMINATE) {
112  B2RESULT("Having received terminate message");
113  terminate = true;
114  }
115  }
116 
117  // Monitoring
118  flow.log(size * sizeof(int));
119 
120  // Write to ring buffer
121  const int returnValue = mainLoop.writeToRingBufferWaiting(ringBuffer, buffer, size);
122  // Error check ring buffer
123  if (returnValue <= 0) {
124  if (mainLoop.isRunning()) {
125  B2ERROR("Writing to the ring buffer failed!");
126  }
127  // This is fine if we are terminating anyways
128  break;
129  }
130  B2ASSERT("Written size is negative! This should be handled above. Not good!", returnValue > 0);
131 
132  // Logging
133  nevt++;
134  if (nevt % 5000 == 0) {
135  B2RESULT("b2hlt_file2rb event number: " << nevt);
136  }
137  }
138 
139  B2RESULT("Program terminated.");
140 }
prepareAsicCrosstalkSimDB.e
e
aux.
Definition: prepareAsicCrosstalkSimDB.py:53
Belle2::EvtMessage
Class to manage streamed object.
Definition: EvtMessage.h:60
Belle2::RFFlowStat
Definition: RFFlowStat.h:28
Belle2::RingBuffer
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:36
main
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:77
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::HLTMainLoop
Definition: HLTMainLoop.h:28
Belle2::HLTFile
Definition: HLTFile.h:11