8 #include <daq/rfarm/manager/RFFlowStat.h> 
    9 #include <framework/pcore/RingBuffer.h> 
   10 #include <daq/rfarm/event/hltsocket/HLTMainLoop.h> 
   11 #include <daq/rfarm/event/hltsocket/HLTSocket.h> 
   12 #include <framework/logging/Logger.h> 
   13 #include <framework/pcore/EvtMessage.h> 
   15 #include <boost/program_options.hpp> 
   18 #define MAXEVTSIZE 80000000 
   21 namespace po = boost::program_options;
 
   23 int main(
int argc, 
char* argv[])
 
   25   std::string ringBufferName;
 
   30   std::string sourceHost;
 
   32   po::options_description desc(
"b2hlt_socket2rb PORT RING-BUFFER-NAME SHM-NAME SHM-ID");
 
   34   (
"help,h", 
"Print this help message")
 
   35   (
"port,p", po::value<unsigned int>(&port)->required(), 
"port number to connect or listen to")
 
   36   (
"ring-buffer-name,r", po::value<std::string>(&ringBufferName)->required(), 
"name of the ring buffer")
 
   37   (
"shm-name,n", po::value<std::string>(&shmName)->required(), 
"name of the shm for flow output")
 
   38   (
"shm-id,i", po::value<unsigned int>(&shmID)->required(), 
"id in the shm for flow output")
 
   39   (
"connect-to,c", po::value<std::string>(&sourceHost), 
"connect to a given host instead of listening")
 
   40   (
"raw", po::bool_switch(&raw)->default_value(
false), 
"send and receive raw data instead of event buffers");
 
   42   po::positional_options_description p;
 
   43   p.add(
"port", 1).add(
"ring-buffer-name", 1).add(
"shm-name", 1).add(
"shm-id", 1);
 
   47     po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
 
   48   } 
catch (std::exception& e) {
 
   52   if (vm.count(
"help")) {
 
   53     std::cout << desc << std::endl;
 
   59   } 
catch (std::exception& e) {
 
   65   RFFlowStat flow((
char*)shmName.c_str(), shmID, ringBuffer);
 
   66   int* buffer = 
new int[MAXEVTSIZE];
 
   73   bool terminate = 
false;
 
   75   while (mainLoop.isRunning() and not terminate) {
 
   77     if (not socket.initialized()) {
 
   78       if (vm.count(
"connect-to")) {
 
   79         if (not socket.connect(sourceHost, port, mainLoop)) {
 
   80           B2ERROR(
"Could not reconnnect!");
 
   84         if (not socket.accept(port)) {
 
   85           B2ERROR(
"Could not reconnect!");
 
   89       B2RESULT(
"Connected.");
 
   94       size = socket.get_wordbuf(buffer, MAXEVTSIZE);
 
   96       size = socket.get(
reinterpret_cast<char*
>(buffer), MAXEVTSIZE);
 
  100       B2ERROR(
"Error in receiving the event! Reconnecting.");
 
  101       socket.deinitialize();
 
  103     } 
else if (size < 0) {
 
  104       if (mainLoop.isRunning()) {
 
  105         B2ERROR(
"Error in receiving the event! Aborting.");
 
  110     B2ASSERT(
"Size is negative! This should be handled above. Not good!", size > 0);
 
  114       int sizeInWords = ((size - 1) / 
sizeof(
int) + 1);
 
  117       unsigned int sizeRoundedUp = sizeInWords * 
sizeof(int);
 
  118       auto charBuffer = 
reinterpret_cast<char*
>(buffer);
 
  119       for (
unsigned int pos = size; pos < sizeRoundedUp; ++pos) {
 
  125       EvtMessage message(
reinterpret_cast<char*
>(buffer));
 
  126       if (message.type() == MSG_TERMINATE) {
 
  127         B2RESULT(
"Having received terminate message");
 
  133     flow.log(size * 
sizeof(
int));
 
  136     const int returnValue = mainLoop.writeToRingBufferWaiting(ringBuffer, buffer, size);
 
  138     if (returnValue <= 0) {
 
  139       if (mainLoop.isRunning()) {
 
  140         B2ERROR(
"Writing to the ring buffer failed!");
 
  145     B2ASSERT(
"Written size is negative! This should be handled above. Not good!", returnValue > 0);
 
  149     if (nevt % 5000 == 0) {
 
  150       B2RESULT(
"b2hlt_rb2socket event number: " << nevt);
 
  154   B2RESULT(
"Program terminated.");
 
Class to manage streamed object.
Class to manage a Ring Buffer placed in an IPC shared memory.
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.