Belle II Software  release-05-01-25
b2hlt_socket2rb.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Ryosuke Itoh, Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <daq/rfarm/manager/RFFlowStat.h>
11 #include <framework/pcore/RingBuffer.h>
12 #include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
13 #include <daq/rfarm/event/hltsocket/HLTSocket.h>
14 #include <framework/logging/Logger.h>
15 #include <framework/pcore/EvtMessage.h>
16 
17 #include <boost/program_options.hpp>
18 #include <iostream>
19 
20 #define MAXEVTSIZE 80000000
21 
22 using namespace Belle2;
23 namespace po = boost::program_options;
24 
25 int main(int argc, char* argv[])
26 {
27  std::string ringBufferName;
28  unsigned int port;
29  std::string shmName;
30  unsigned int shmID;
31  bool raw;
32  std::string sourceHost;
33 
34  po::options_description desc("b2hlt_socket2rb PORT RING-BUFFER-NAME SHM-NAME SHM-ID");
35  desc.add_options()
36  ("help,h", "Print this help message")
37  ("port,p", po::value<unsigned int>(&port)->required(), "port number to connect or listen to")
38  ("ring-buffer-name,r", po::value<std::string>(&ringBufferName)->required(), "name of the ring buffer")
39  ("shm-name,n", po::value<std::string>(&shmName)->required(), "name of the shm for flow output")
40  ("shm-id,i", po::value<unsigned int>(&shmID)->required(), "id in the shm for flow output")
41  ("connect-to,c", po::value<std::string>(&sourceHost), "connect to a given host instead of listening")
42  ("raw", po::bool_switch(&raw)->default_value(false), "send and receive raw data instead of event buffers");
43 
44  po::positional_options_description p;
45  p.add("port", 1).add("ring-buffer-name", 1).add("shm-name", 1).add("shm-id", 1);
46 
47  po::variables_map vm;
48  try {
49  po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
50  } catch (std::exception& e) {
51  B2FATAL(e.what());
52  }
53 
54  if (vm.count("help")) {
55  std::cout << desc << std::endl;
56  return 1;
57  }
58 
59  try {
60  po::notify(vm);
61  } catch (std::exception& e) {
62  B2FATAL(e.what());
63  }
64 
65  // TODO: delete or not?
66  RingBuffer* ringBuffer = new RingBuffer(ringBufferName.c_str());
67  RFFlowStat flow((char*)shmName.c_str(), shmID, ringBuffer);
68  int* buffer = new int[MAXEVTSIZE];
69 
70  HLTMainLoop mainLoop;
71 
72  HLTSocket socket;
73  int size;
74  int nevt = 0;
75  bool terminate = false;
76 
77  while (mainLoop.isRunning() and not terminate) {
78  // Connect socket if needed
79  if (not socket.initialized()) {
80  if (vm.count("connect-to")) {
81  if (not socket.connect(sourceHost, port, mainLoop)) {
82  B2ERROR("Could not reconnnect!");
83  break;
84  }
85  } else {
86  if (not socket.accept(port)) {
87  B2ERROR("Could not reconnect!");
88  break;
89  }
90  }
91  B2RESULT("Connected.");
92  }
93 
94  // Read from socket
95  if (raw) {
96  size = socket.get_wordbuf(buffer, MAXEVTSIZE);
97  } else {
98  size = socket.get(reinterpret_cast<char*>(buffer), MAXEVTSIZE);
99  }
100  // Error checking socket
101  if (size == 0) {
102  B2ERROR("Error in receiving the event! Reconnecting.");
103  socket.deinitialize();
104  continue;
105  } else if (size < 0) {
106  if (mainLoop.isRunning()) {
107  B2ERROR("Error in receiving the event! Aborting.");
108  }
109  // This is fine if we are terminating anyways
110  break;
111  }
112  B2ASSERT("Size is negative! This should be handled above. Not good!", size > 0);
113 
114  if (not raw) {
115  // We want to have it in words, not bytes
116  int sizeInWords = ((size - 1) / sizeof(int) + 1);
117 
118  // However we have to make sure to pad the buffer correctly, as sizeInWords could be a larger buffer
119  unsigned int sizeRoundedUp = sizeInWords * sizeof(int);
120  auto charBuffer = reinterpret_cast<char*>(buffer);
121  for (int pos = size; pos < sizeRoundedUp; pos++) {
122  charBuffer[pos] = 0;
123  }
124  size = sizeInWords;
125 
126  // Terminate messages make us terminate
127  EvtMessage message(reinterpret_cast<char*>(buffer));
128  if (message.type() == MSG_TERMINATE) {
129  B2RESULT("Having received terminate message");
130  terminate = true;
131  }
132  }
133 
134  // Monitoring
135  flow.log(size * sizeof(int));
136 
137  // Write to ring buffer
138  const int returnValue = mainLoop.writeToRingBufferWaiting(ringBuffer, buffer, size);
139  // Error check ring buffer
140  if (returnValue <= 0) {
141  if (mainLoop.isRunning()) {
142  B2ERROR("Writing to the ring buffer failed!");
143  }
144  // This is fine if we are terminating anyways
145  break;
146  }
147  B2ASSERT("Written size is negative! This should be handled above. Not good!", returnValue > 0);
148 
149  // Logging
150  nevt++;
151  if (nevt % 5000 == 0) {
152  B2RESULT("b2hlt_rb2socket event number: " << nevt);
153  }
154  }
155 
156  B2RESULT("Program terminated.");
157 }
prepareAsicCrosstalkSimDB.e
e
aux.
Definition: prepareAsicCrosstalkSimDB.py:53
Belle2::EvtMessage
Class to manage streamed object.
Definition: EvtMessage.h:60
Belle2::RFFlowStat
Definition: RFFlowStat.h:28
Belle2::RingBuffer
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:36
main
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:77
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::HLTMainLoop
Definition: HLTMainLoop.h:28
Belle2::HLTSocket
Definition: HLTSocket.h:13