Belle II Software  release-08-01-10
b2hlt_socket2rb.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/rfarm/manager/RFFlowStat.h>
9 #include <framework/pcore/RingBuffer.h>
10 #include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
11 #include <daq/rfarm/event/hltsocket/HLTSocket.h>
12 #include <framework/logging/Logger.h>
13 #include <framework/pcore/EvtMessage.h>
14 
15 #include <boost/program_options.hpp>
16 #include <iostream>
17 
18 #define MAXEVTSIZE 80000000
19 
20 using namespace Belle2;
21 namespace po = boost::program_options;
22 
23 int main(int argc, char* argv[])
24 {
25  std::string ringBufferName;
26  unsigned int port;
27  std::string shmName;
28  unsigned int shmID;
29  bool raw;
30  std::string sourceHost;
31 
32  po::options_description desc("b2hlt_socket2rb PORT RING-BUFFER-NAME SHM-NAME SHM-ID");
33  desc.add_options()
34  ("help,h", "Print this help message")
35  ("port,p", po::value<unsigned int>(&port)->required(), "port number to connect or listen to")
36  ("ring-buffer-name,r", po::value<std::string>(&ringBufferName)->required(), "name of the ring buffer")
37  ("shm-name,n", po::value<std::string>(&shmName)->required(), "name of the shm for flow output")
38  ("shm-id,i", po::value<unsigned int>(&shmID)->required(), "id in the shm for flow output")
39  ("connect-to,c", po::value<std::string>(&sourceHost), "connect to a given host instead of listening")
40  ("raw", po::bool_switch(&raw)->default_value(false), "send and receive raw data instead of event buffers");
41 
42  po::positional_options_description p;
43  p.add("port", 1).add("ring-buffer-name", 1).add("shm-name", 1).add("shm-id", 1);
44 
45  po::variables_map vm;
46  try {
47  po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
48  } catch (std::exception& e) {
49  B2FATAL(e.what());
50  }
51 
52  if (vm.count("help")) {
53  std::cout << desc << std::endl;
54  return 1;
55  }
56 
57  try {
58  po::notify(vm);
59  } catch (std::exception& e) {
60  B2FATAL(e.what());
61  }
62 
63  // TODO: delete or not?
64  RingBuffer* ringBuffer = new RingBuffer(ringBufferName.c_str());
65  RFFlowStat flow((char*)shmName.c_str(), shmID, ringBuffer);
66  int* buffer = new int[MAXEVTSIZE];
67 
68  HLTMainLoop mainLoop;
69 
70  HLTSocket socket;
71  int size;
72  int nevt = 0;
73  bool terminate = false;
74 
75  while (mainLoop.isRunning() and not terminate) {
76  // Connect socket if needed
77  if (not socket.initialized()) {
78  if (vm.count("connect-to")) {
79  if (not socket.connect(sourceHost, port, mainLoop)) {
80  B2ERROR("Could not reconnnect!");
81  break;
82  }
83  } else {
84  if (not socket.accept(port)) {
85  B2ERROR("Could not reconnect!");
86  break;
87  }
88  }
89  B2RESULT("Connected.");
90  }
91 
92  // Read from socket
93  if (raw) {
94  size = socket.get_wordbuf(buffer, MAXEVTSIZE);
95  } else {
96  size = socket.get(reinterpret_cast<char*>(buffer), MAXEVTSIZE);
97  }
98  // Error checking socket
99  if (size == 0) {
100  B2ERROR("Error in receiving the event! Reconnecting.");
101  socket.deinitialize();
102  continue;
103  } else if (size < 0) {
104  if (mainLoop.isRunning()) {
105  B2ERROR("Error in receiving the event! Aborting.");
106  }
107  // This is fine if we are terminating anyways
108  break;
109  }
110  B2ASSERT("Size is negative! This should be handled above. Not good!", size > 0);
111 
112  if (not raw) {
113  // We want to have it in words, not bytes
114  int sizeInWords = ((size - 1) / sizeof(int) + 1);
115 
116  // However we have to make sure to pad the buffer correctly, as sizeInWords could be a larger buffer
117  unsigned int sizeRoundedUp = sizeInWords * sizeof(int);
118  auto charBuffer = reinterpret_cast<char*>(buffer);
119  for (unsigned int pos = size; pos < sizeRoundedUp; ++pos) {
120  charBuffer[pos] = 0;
121  }
122  size = sizeInWords;
123 
124  // Terminate messages make us terminate
125  EvtMessage message(reinterpret_cast<char*>(buffer));
126  if (message.type() == MSG_TERMINATE) {
127  B2RESULT("Having received terminate message");
128  terminate = true;
129  }
130  }
131 
132  // Monitoring
133  flow.log(size * sizeof(int));
134 
135  // Write to ring buffer
136  const int returnValue = mainLoop.writeToRingBufferWaiting(ringBuffer, buffer, size);
137  // Error check ring buffer
138  if (returnValue <= 0) {
139  if (mainLoop.isRunning()) {
140  B2ERROR("Writing to the ring buffer failed!");
141  }
142  // This is fine if we are terminating anyways
143  break;
144  }
145  B2ASSERT("Written size is negative! This should be handled above. Not good!", returnValue > 0);
146 
147  // Logging
148  nevt++;
149  if (nevt % 5000 == 0) {
150  B2RESULT("b2hlt_rb2socket event number: " << nevt);
151  }
152  }
153 
154  B2RESULT("Program terminated.");
155 }
Class to manage streamed object.
Definition: EvtMessage.h:59
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91