Belle II Software  release-08-01-10
EventSampler.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include "daq/expreco/EventSampler.h"
10 
11 #include <unistd.h>
12 
13 using namespace Belle2;
14 using namespace std;
15 
16 // constructor/destructor
17 
18 EventSampler::EventSampler(vector<string> nodes, int port, string rbufname, int interval) : m_port(port), m_interval(interval)
19 {
20  // Attach to output RingBuffer
21  m_rbuf = new RingBuffer(rbufname.c_str());
22 
23  // Open EvtSocket
24  for (vector<string>::iterator it = nodes.begin(); it != nodes.end(); ++it) {
25  string& nodename = *it;
26  printf("EventSampler : connecting to %s (port %d)\n", nodename.c_str(), port);
27  EvtSocketSend* sock = new EvtSocketSend(nodename.c_str(), port);
28  if (sock == NULL) {
29  printf("EventSampler : error to connect to %s\n",
30  nodename.c_str());
31  } else {
32  m_socklist.push_back(sock);
33  }
34  fflush(stdout);
35  }
36  printf("EventSampler : init : socklist = %lu\n", m_socklist.size());
37  fflush(stdout);
38 }
39 
40 EventSampler::~EventSampler()
41 {
42  for (vector<EvtSocketSend*>::iterator it = m_socklist.begin();
43  it != m_socklist.end(); ++it) {
44  EvtSocketSend* sock = *it;
45  delete sock;
46  m_socklist.erase(it);
47  }
48  delete m_rbuf;
49 }
50 
51 int EventSampler::server()
52 {
53  // printf ( "EventSampler : server started\n" );
54  fflush(stdout);
55  int nsample = 0;
56  for (;;) {
57  for (vector<EvtSocketSend*>::iterator it = m_socklist.begin();
58  it != m_socklist.end(); ++it) {
59  // Receive an event from connected socket
60  EvtSocketSend* sock = *it;
61  // printf ( "EventSampler : receiving event from sock %s\n",
62  // (sock->sock())->node() );
63  EvtMessage* msg = sock->recv();
64  // printf ( "Event Sampler : got event : %d\n", msg->size() );
65  // fflush ( stdout );
66  if (msg == NULL) {
67  printf("EventSampler : Error to receive data\n");
68  return -1;
69  }
70  // Put the message in ring buffer. If full, just skip the event.
71  m_rbuf->insq((int*)msg->buffer(), msg->paddedSize());
72  delete msg;
73  nsample++;
74  if (nsample % 1000 == 0)
75  printf("EventSampler : %d events sampled and queued\n", nsample);
76  usleep(m_interval);
77  }
78  }
79 }
80 
Class to manage streamed object.
Definition: EvtMessage.h:59
int paddedSize() const
Same as size(), but as size of an integer array.
Definition: EvtMessage.cc:99
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:76
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
Definition: RingBuffer.cc:189
Abstract base class for different kinds of events.