Belle II Software  release-08-01-10
b2hlt_rb2file.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/rfarm/manager/RFFlowStat.h>
9 #include <framework/pcore/RingBuffer.h>
10 #include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
11 #include <daq/rfarm/event/hltsocket/HLTFile.h>
12 #include <framework/logging/Logger.h>
13 #include <framework/pcore/EvtMessage.h>
14 
15 #include <boost/program_options.hpp>
16 #include <iostream>
17 
18 #define MAXEVTSIZE 80000000
19 
20 using namespace Belle2;
21 namespace po = boost::program_options;
22 
23 int main(int argc, char* argv[])
24 {
25  std::string ringBufferName;
26  std::string shmName;
27  unsigned int shmID;
28  bool raw;
29  std::string file_name;
30 
31  po::options_description desc("b2hlt_rb2socket RING-BUFFER-NAME FILE-NAME SHM-NAME SHM-ID");
32  desc.add_options()
33  ("help,h", "Print this help message")
34  ("ring-buffer-name,r", po::value<std::string>(&ringBufferName)->required(), "name of the ring buffer")
35  ("file-name,f", po::value<std::string>(&file_name)->required(), "file name to write to")
36  ("shm-name,n", po::value<std::string>(&shmName)->required(), "name of the shm for flow output")
37  ("shm-id,i", po::value<unsigned int>(&shmID)->required(), "id in the shm for flow output")
38  ("raw", po::bool_switch(&raw)->default_value(false), "send and receive raw data instead of event buffers");
39 
40 
41  po::positional_options_description p;
42  p.add("ring-buffer-name", 1).add("file-name", 1).add("shm-name", 1).add("shm-id", 1);
43 
44  po::variables_map vm;
45  try {
46  po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
47  } catch (std::exception& e) {
48  B2FATAL(e.what());
49  }
50 
51  if (vm.count("help")) {
52  std::cout << desc << std::endl;
53  return 1;
54  }
55 
56  try {
57  po::notify(vm);
58  } catch (std::exception& e) {
59  B2FATAL(e.what());
60  }
61 
62  // TODO: delete or not?
63  RingBuffer* ringBuffer = new RingBuffer(ringBufferName.c_str());
64  RFFlowStat flow((char*)shmName.c_str(), shmID, ringBuffer);
65  int* buffer = new int[MAXEVTSIZE];
66 
67  HLTMainLoop mainLoop;
68 
69  HLTFile file;
70  int returnValue;
71  int nevt = 0;
72  bool terminate = false;
73 
74  if (not file.open(file_name, raw, "w")) {
75  B2ERROR("Can not open file");
76  terminate = true;
77  }
78 
79  while (mainLoop.isRunning() and not terminate) {
80  // Read from ring buffer
81  const int size = mainLoop.readFromRingBufferWaiting(ringBuffer, buffer);
82  // Error checking ring buffer
83  if (size <= 0) {
84  if (mainLoop.isRunning()) {
85  B2ERROR("Writing to the ring buffer failed!");
86  }
87  // This is fine if we are terminating anyways
88  break;
89  }
90  B2ASSERT("Size is negative! This should be handled above. Not good!", size > 0);
91 
92  // Monitoring
93  flow.log(size * sizeof(int));
94 
95  if (raw) {
96  returnValue = file.put_wordbuf(buffer, size);
97  } else {
98  EvtMessage message(reinterpret_cast<char*>(buffer));
99  returnValue = file.put(message.buffer(), message.size());
100  // Terminate messages make us terminate
101  if (message.type() == MSG_TERMINATE) {
102  B2RESULT("Having received terminate message");
103  terminate = true;
104  }
105  }
106  if (returnValue <= 0) {
107  if (mainLoop.isRunning()) {
108  B2ERROR("Error in writing the event! Aborting.");
109  }
110  // This is fine if we are terminating anyways
111  break;
112  }
113  B2ASSERT("Written size is negative! This should be handled above. Not good!", returnValue > 0);
114 
115  // Logging
116  nevt++;
117  if (nevt % 5000 == 0) {
118  B2RESULT("b2hlt_rb2file event number: " << nevt);
119  }
120  }
121 
122  B2RESULT("Program terminated.");
123 }
Class to manage streamed object.
Definition: EvtMessage.h:59
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91