Belle II Software development
EventSampler.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include "daq/expreco/EventSampler.h"
10
11#include <unistd.h>
12
13using namespace Belle2;
14using namespace std;
15
16// constructor/destructor
17
18EventSampler::EventSampler(vector<string> nodes, int port, string rbufname, int interval) : m_interval(interval)
19{
20 // Attach to output RingBuffer
21 m_rbuf = new RingBuffer(rbufname.c_str());
22
23 // Open EvtSocket
24 for (vector<string>::iterator it = nodes.begin(); it != nodes.end(); ++it) {
25 string& nodename = *it;
26 printf("EventSampler : connecting to %s (port %d)\n", nodename.c_str(), port);
27 EvtSocketSend* sock = new EvtSocketSend(nodename.c_str(), port);
28 if (sock == NULL) {
29 printf("EventSampler : error to connect to %s\n",
30 nodename.c_str());
31 } else {
32 m_socklist.push_back(sock);
33 }
34 fflush(stdout);
35 }
36 printf("EventSampler : init : socklist = %lu\n", m_socklist.size());
37 fflush(stdout);
38}
39
40EventSampler::~EventSampler()
41{
42 for (vector<EvtSocketSend*>::iterator it = m_socklist.begin();
43 it != m_socklist.end(); ++it) {
44 EvtSocketSend* sock = *it;
45 delete sock;
46 m_socklist.erase(it);
47 }
48 delete m_rbuf;
49}
50
51int EventSampler::server()
52{
53 // printf ( "EventSampler : server started\n" );
54 fflush(stdout);
55 int nsample = 0;
56 for (;;) {
57 for (vector<EvtSocketSend*>::iterator it = m_socklist.begin();
58 it != m_socklist.end(); ++it) {
59 // Receive an event from connected socket
60 EvtSocketSend* sock = *it;
61 // printf ( "EventSampler : receiving event from sock %s\n",
62 // (sock->sock())->node() );
63 EvtMessage* msg = sock->recv();
64 // printf ( "Event Sampler : got event : %d\n", msg->size() );
65 // fflush ( stdout );
66 if (msg == NULL) {
67 printf("EventSampler : Error to receive data\n");
68 return -1;
69 }
70 // Put the message in ring buffer. If full, just skip the event.
71 m_rbuf->insq((int*)msg->buffer(), msg->paddedSize());
72 delete msg;
73 nsample++;
74 if (nsample % 1000 == 0)
75 printf("EventSampler : %d events sampled and queued\n", nsample);
76 usleep(m_interval);
77 }
78 }
79}
80
Class to manage streamed object.
Definition: EvtMessage.h:59
int paddedSize() const
Same as size(), but as size of an integer array.
Definition: EvtMessage.cc:99
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:76
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
Definition: RingBuffer.cc:189
Abstract base class for different kinds of events.
STL namespace.