Belle II Software development
RingBuffer Class Reference

Class to manage a Ring Buffer placed in an IPC shared memory. More...

#include <RingBuffer.h>

Public Member Functions

 RingBuffer (int nwords=c_DefaultSize)
 Constructor to create a new shared memory in private space.
 
 RingBuffer (const std::string &name, unsigned int nwords=0)
 Constructor to create/attach named shared memory in global space.
 
 ~RingBuffer ()
 Destructor.
 
void openSHM (int nwords)
 open shared memory
 
void cleanup ()
 Function to detach and remove shared memory.
 
int insq (const int *buf, int size, bool checkTx=false)
 Append a buffer to the RingBuffer.
 
int remq (int *buf)
 Pick up a buffer from the RingBuffer.
 
int spyq (int *buf) const
 Prefetch a buffer from the RingBuffer w/o removing it.
 
int numq () const
 Returns number of entries/buffers in the RingBuffer.
 
void txAttached ()
 Increase the number of attached Tx counter.
 
void txDetached ()
 Decrease the number of attached Tx counter.
 
void kill ()
 Cause termination of reading processes (if they use isDead()).
 
bool isDead () const
 If True, the ring buffer is empty and has no attached Tx modules (i.e.
 
bool allRxWaiting () const
 True if and only if buffer is empty and nbusy == 0.
 
int clear ()
 Clear the RingBuffer.
 
void forceClear ()
 Forcefully clear the RingBuffer with resetting semaphore.
 
int tryClear ()
 Clear the RingBuffer, if the semaphore isn't locked at the moment.
 
int shmid () const
 Return ID of the shared memory.
 
void dump_db ()
 Print some info on the RingBufInfo structure.
 
int ninsq () const
 Return number of insq() calls for current buffer.
 
int nremq () const
 Return number of remq() calls for current buffer.
 
int insq_counter () const
 Return number of insq() calls.
 
int remq_counter () const
 Return number of remq() calls.
 
void dumpInfo () const
 Dump contents of RingBufInfo metadata.
 

Static Public Attributes

static const int c_DefaultSize = 15000000
 Standard size of buffer, in integers (~60MB).
 

Private Attributes

bool m_new {true}
 True if we created the ring buffer ourselves (and need to clean it).
 
bool m_file {false}
 True if m_pathfd needs to be closed.
 
std::string m_pathname {""}
 Path for identifying shared memory if named ring buffer is created.
 
int m_pathfd { -1}
 Associated file descriptor.
 
key_t m_shmkey {IPC_PRIVATE}
 SHM key, see shmget(2).
 
key_t m_semkey {IPC_PRIVATE}
 Semaphore key, see semget(2).
 
std::string m_semshmFileName {""}
 file path containing ids of shm and sema for private shared mem, used for easier cleanup if we fail to do things properly
 
bool m_procIsBusy {false}
 Is this process currently processing events from this RingBuffer?
 
int m_shmid { -1}
 ID of shared memory segment.
 
int * m_shmadr {nullptr}
 Address of attached shared memory segment.
 
int m_shmsize { -1}
 Size of shared memory segment, in bytes.
 
struct RingBufInfom_bufinfo {nullptr}
 structure to manage ring buffer.
 
int * m_buftop {nullptr}
 Points to memory after the end of m_bufinfo.
 
int m_semid { -1}
 Semaphore ID.
 
int m_remq_counter {0}
 count remq() calls.
 
int m_insq_counter {0}
 count insq() calls.
 

Detailed Description

Class to manage a Ring Buffer placed in an IPC shared memory.

Definition at line 39 of file RingBuffer.h.

Constructor & Destructor Documentation

◆ RingBuffer() [1/2]

RingBuffer ( int  nwords = c_DefaultSize)
explicit

Constructor to create a new shared memory in private space.

Parameters
nwordsRing buffer size in integers

Definition at line 32 of file RingBuffer.cc.

33{
34 openSHM(size);
35 B2DEBUG(32, "RingBuffer initialization done");
36}
void openSHM(int nwords)
open shared memory
Definition: RingBuffer.cc:84

◆ RingBuffer() [2/2]

RingBuffer ( const std::string &  name,
unsigned int  nwords = 0 
)
explicit

Constructor to create/attach named shared memory in global space.

Definition at line 39 of file RingBuffer.cc.

40{
41 // 0. Determine shared memory type
42 if (name != "private") { // Global
43 m_file = true;
44 m_pathname = std::string(std::filesystem::temp_directory_path() / getenv("USER")) + "_RB_" + name;
45 m_pathfd = open(m_pathname.c_str(), O_CREAT | O_EXCL | O_RDWR, 0644);
46 if (m_pathfd > 0) { // a new shared memory file created
47 B2DEBUG(32, "[RingBuffer] Creating a ring buffer with key " << name);
48 m_new = true;
49 } else if (m_pathfd == -1 && errno == EEXIST) { // shm already there
50 B2DEBUG(32, "[RingBuffer] Attaching the ring buffer with key " << name);
51 m_new = false;
52 } else {
53 B2FATAL("RingBuffer: error opening shm file: " << m_pathname);
54 }
55 m_shmkey = ftok(m_pathname.c_str(), 1);
56 m_semkey = ftok(m_pathname.c_str(), 2);
57 } else { // Private
58 B2DEBUG(32, "[RingBuffer] Opening private ring buffer");
59 }
60
61 openSHM(nwords);
62
63 if (m_pathfd > 0) {
64 B2DEBUG(32, "First global RingBuffer creation: writing SHM info to file.");
65 char rbufinfo[256];
66 snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_shmid);
67 int is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
68 if (is < 0) perror("write");
69 snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_semid);
70 is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
71 if (is < 0) perror("write");
72 close(m_pathfd);
73 }
74
75
76 B2DEBUG(32, "RingBuffer initialization done with shm=" << m_shmid);
77}
bool m_file
True if m_pathfd needs to be closed.
Definition: RingBuffer.h:111
key_t m_shmkey
SHM key, see shmget(2).
Definition: RingBuffer.h:114
int m_semid
Semaphore ID.
Definition: RingBuffer.h:131
int m_shmid
ID of shared memory segment.
Definition: RingBuffer.h:126
int m_pathfd
Associated file descriptor.
Definition: RingBuffer.h:113
std::string m_pathname
Path for identifying shared memory if named ring buffer is created.
Definition: RingBuffer.h:112
bool m_new
True if we created the ring buffer ourselves (and need to clean it).
Definition: RingBuffer.h:110
key_t m_semkey
Semaphore key, see semget(2).
Definition: RingBuffer.h:115

◆ ~RingBuffer()

~RingBuffer ( )

Destructor.

Definition at line 79 of file RingBuffer.cc.

80{
81 cleanup();
82}
void cleanup()
Function to detach and remove shared memory.
Definition: RingBuffer.cc:162

Member Function Documentation

◆ allRxWaiting()

bool allRxWaiting ( ) const

True if and only if buffer is empty and nbusy == 0.

Called in Tx to see if all events of the current run have been processed

Definition at line 400 of file RingBuffer.cc.

401{
402 SemaphoreLocker locker(m_semid);
403 return (m_bufinfo->nbusy == 0) and (m_bufinfo->nbuf == 0);
404}
struct RingBufInfo * m_bufinfo
structure to manage ring buffer.
Definition: RingBuffer.h:129
Handles creation, locking and unlocking of System V semaphores.
int nbusy
Number of attached reading processes currently processing events.
Definition: RingBuffer.h:30
int nbuf
Number of entries in ring buffer.
Definition: RingBuffer.h:27

◆ cleanup()

void cleanup ( )

Function to detach and remove shared memory.

Definition at line 162 of file RingBuffer.cc.

163{
164 if (m_procIsBusy) {
165 SemaphoreLocker locker(m_semid);
166 m_bufinfo->nbusy--;
167 m_procIsBusy = false;
168 }
169
170 shmdt(m_shmadr);
171 B2DEBUG(32, "RingBuffer: Cleaning up IPC");
172 if (m_new) {
173 shmctl(m_shmid, IPC_RMID, (struct shmid_ds*) nullptr);
175 if (m_file) {
176 unlink(m_pathname.c_str());
177 }
178 unlink(m_semshmFileName.c_str());
179 }
180}
bool m_procIsBusy
Is this process currently processing events from this RingBuffer?
Definition: RingBuffer.h:124
std::string m_semshmFileName
file path containing ids of shm and sema for private shared mem, used for easier cleanup if we fail t...
Definition: RingBuffer.h:117
int * m_shmadr
Address of attached shared memory segment.
Definition: RingBuffer.h:127
static void destroy(int semId)
Destroy the given semaphore.

◆ clear()

int clear ( )

Clear the RingBuffer.

Definition at line 426 of file RingBuffer.cc.

427{
428 SemaphoreLocker locker(m_semid);
429 // m_bufinfo->size = m_shmsize - sizeof ( struct RingBufInfo );
431 m_bufinfo->wptr = 0;
432 m_bufinfo->prevwptr = 0;
433 m_bufinfo->rptr = 0;
434 m_bufinfo->nbuf = 0;
435 m_bufinfo->ninsq = 0;
436 m_bufinfo->nremq = 0;
437
438 return 0;
439}
int wptr
Pointer for writing entries.
Definition: RingBuffer.h:24
int size
ring buffer size (integers), minus this header.
Definition: RingBuffer.h:22
int nremq
Count remq() calls for this buffer.
Definition: RingBuffer.h:35
int rptr
Pointer for reading entries.
Definition: RingBuffer.h:26
int remain
Unsure, always equal to size.
Definition: RingBuffer.h:23
int prevwptr
Previous state of wptr (for error recovery).
Definition: RingBuffer.h:25
int ninsq
Count insq() calls for this buffer.
Definition: RingBuffer.h:34

◆ dump_db()

void dump_db ( )

Print some info on the RingBufInfo structure.

Definition at line 182 of file RingBuffer.cc.

183{
184 printf("bufsize=%d, remain=%d, wptr=%d, rptr=%d, nbuf=%d\n",
187}

◆ dumpInfo()

void dumpInfo ( ) const

Dump contents of RingBufInfo metadata.

Definition at line 463 of file RingBuffer.cc.

464{
465 SemaphoreLocker locker(m_semid);
466
467 // Dump control parameters
468 printf("***** Ring Buffer Information ***\n");
469 printf("path = %s\n", m_pathname.c_str());
470 printf("shmsize = %d\n", m_shmsize);
471 printf("[Buffer Info]\n");
472 printf("bufsize = %d\n", m_bufinfo->size);
473 printf("remain = %d\n", m_bufinfo->remain);
474 printf("wptr = %d\n", m_bufinfo->wptr);
475 printf("prevwptr = %d\n", m_bufinfo->prevwptr);
476 printf("rptr = %d\n", m_bufinfo->rptr);
477 printf("nbuf = %d\n", m_bufinfo->nbuf);
478 printf("nattached = %d\n", m_bufinfo->nattached);
479 printf("nbusy = %d\n", m_bufinfo->nbusy);
480 printf("errtype = %d\n", m_bufinfo->errtype);
481 printf("numAttachedTx = %d\n", m_bufinfo->numAttachedTx);
482 printf("ninsq = %d\n", m_bufinfo->ninsq);
483 printf("nremq = %d\n", m_bufinfo->ninsq);
484}
int m_shmsize
Size of shared memory segment, in bytes.
Definition: RingBuffer.h:128
int numAttachedTx
number of attached sending processes.
Definition: RingBuffer.h:33
int errtype
Error state? 0: Normal, 1: buffer full and wptr>rptr, others are complicated.
Definition: RingBuffer.h:32
int nattached
Number of RingBuffer instances currently attached to this buffer.
Definition: RingBuffer.h:29

◆ forceClear()

void forceClear ( )

Forcefully clear the RingBuffer with resetting semaphore.

Definition at line 441 of file RingBuffer.cc.

442{
443 int val = 1;
444 if (semctl(m_semid, 0, SETVAL, val) == -1) { //set 0th semaphore to semval
445 B2ERROR("Initializing semaphore with semctl() failed.");
446 }
447 clear();
448}
int clear()
Clear the RingBuffer.
Definition: RingBuffer.cc:426

◆ insq()

int insq ( const int *  buf,
int  size,
bool  checkTx = false 
)

Append a buffer to the RingBuffer.

Definition at line 189 of file RingBuffer.cc.

190{
191 if (size <= 0) {
192 B2FATAL("RingBuffer::insq() failed: invalid buffer size = " << size);
193 }
194 if (m_bufinfo->numAttachedTx == 0 and checkTx) {
195 //safe abort was requested
196 B2WARNING("Number of attached Tx is 0, so I will not go on with the processing.");
197 exit(0);
198 }
199 SemaphoreLocker locker(m_semid);
200 if (m_bufinfo->nbuf == 0) {
201 m_bufinfo->wptr = 0;
202 m_bufinfo->rptr = 0;
203 if (size > m_bufinfo->size + 2) {
204 throw std::runtime_error("[RingBuffer::insq ()] Inserted item (size: " + std::to_string(size) +
205 ") is larger than RingBuffer (size: " + std::to_string(m_bufinfo->size + 2) + ")!");
206 }
207 m_bufinfo->errtype = 0;
208 int* wptr = m_buftop + m_bufinfo->wptr;
209 *wptr = size;
210 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
211 memcpy(wptr + 2, buf, size * sizeof(int));
213 m_bufinfo->wptr += (size + 2);
214 m_bufinfo->nbuf++;
215 m_bufinfo->ninsq++;
217 return size;
218 } else if (m_bufinfo->wptr > m_bufinfo->rptr) {
219 if (m_bufinfo->errtype != 4 &&
220 m_bufinfo->errtype != 3 && m_bufinfo->errtype != 0) {
221 B2ERROR("insq: Error in errtype 0; current=" << m_bufinfo->errtype);
222 return -1;
223 }
224 if (m_bufinfo->errtype == 3) {
225 // printf ( "---> errtype is 3, still remaining buffers\n" );
226 B2DEBUG(32, "[RingBuffer] errtype 3");
227 return -1;
228 } else if (m_bufinfo->errtype == 4) {
229 // printf ( "---> errtype returned to 0, wptr=%d, rptr=%d\n",
230 // m_bufinfo->wptr, m_bufinfo->rptr );
231 }
232 m_bufinfo->errtype = 0;
233 if (size + 2 < m_bufinfo->size - m_bufinfo->wptr) { // normal case
234 int* wptr = m_buftop + m_bufinfo->wptr;
235 *wptr = size;
236 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
237 memcpy(wptr + 2, buf, size * sizeof(int));
239 m_bufinfo->wptr += (size + 2);
240 m_bufinfo->nbuf++;
241 m_bufinfo->ninsq++;
243 return size;
244 } else {
245 if (m_bufinfo->rptr >= size + 2) { // buffer full and wptr>rptr
246 // this should be dead code but I don't understand it well enough to delete it
247 if (m_bufinfo->errtype != 0) {
248 B2ERROR("insq: Error in errtype 1; current=" << m_bufinfo->errtype);
249 return -1;
250 }
251 m_bufinfo->errtype = 1;
252 int* wptr = m_buftop;
253 memcpy(wptr + 2, buf, size * sizeof(int));
254 *wptr = size;
255 *(wptr + 1) = size + 2;
256 m_bufinfo->wptr = *(wptr + 1);
257 int* prevptr = m_buftop + m_bufinfo->prevwptr;
258 *(prevptr + 1) = 0;
259 m_bufinfo->prevwptr = 0;
260 if (m_bufinfo->nbuf == 0) {
261 m_bufinfo->errtype = 4;
262 m_bufinfo->rptr = 0;
263 }
264 m_bufinfo->nbuf++;
265 // printf ( "insq: no more space, space below rptr; prev=%d, new=%d\n",
266 // m_bufinfo->prevwptr, m_bufinfo->wptr );
267 m_bufinfo->ninsq++;
269 return size;
270 } else {
271 // printf ( "insq: wptr>rptr, no more space, no space below rptr(%d), readbuf=%d\n",
272 // m_bufinfo->rptr, m_bufinfo->readbuf );
273 return -1;
274 }
275 }
276 } else { // wptr < rptr
277 if (m_bufinfo->wptr + size + 2 < m_bufinfo->rptr &&
278 size + 2 < m_bufinfo->size - m_bufinfo->rptr) {
279 if (m_bufinfo->errtype != 1 && m_bufinfo->errtype != 2 &&
280 m_bufinfo->errtype != 3) {
281 B2ERROR("insq: Error in errtype 2; current=" << m_bufinfo->errtype);
282 return -1;
283 }
284 m_bufinfo->errtype = 2;
285 int* wptr = m_buftop + m_bufinfo->wptr;
286 *wptr = size;
287 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
288 memcpy(wptr + 2, buf, size * sizeof(int));
290 m_bufinfo->wptr += (size + 2);
291 m_bufinfo->nbuf++;
292 // printf ( "insq: wptr<rptr and enough space below rptr; curr=%d, next=%d, rptr=%d\n", m_bufinfo->prevwptr, m_bufinfo->wptr, m_bufinfo->rptr );
293 if (m_bufinfo->wptr > m_bufinfo->rptr) {
294 B2DEBUG(32, "next pointer will exceed rptr.....");
295 m_bufinfo->errtype = 3;
296 }
297 m_bufinfo->ninsq++;
299 return size;
300 } else {
301 // printf ( "insq: wptr<rptr; no more space below rptr(%d), wptr(%d)\n",
302 // m_bufinfo->rptr, m_bufinfo->wptr );
303 return -1;
304 }
305 }
306}
int * m_buftop
Points to memory after the end of m_bufinfo.
Definition: RingBuffer.h:130
int m_insq_counter
count insq() calls.
Definition: RingBuffer.h:133

◆ insq_counter()

int insq_counter ( ) const

Return number of insq() calls.

Definition at line 416 of file RingBuffer.cc.

417{
418 return m_insq_counter;
419}

◆ isDead()

bool isDead ( ) const

If True, the ring buffer is empty and has no attached Tx modules (i.e.

no new data is going to be added). Processes should then stop.

Definition at line 394 of file RingBuffer.cc.

395{
396 SemaphoreLocker locker(m_semid);
397 //NOTE: numAttachedTx == -1 also means we should read data (i.e. initialization pending)
398 return (m_bufinfo->numAttachedTx == 0) and (m_bufinfo->nbuf <= 0);
399}

◆ kill()

void kill ( )

Cause termination of reading processes (if they use isDead()).

Assumed to be atomic.

Definition at line 389 of file RingBuffer.cc.

390{
392 m_bufinfo->nbuf = 0;
393}

◆ ninsq()

int ninsq ( ) const

Return number of insq() calls for current buffer.

Definition at line 406 of file RingBuffer.cc.

407{
408 return m_bufinfo->ninsq;
409}

◆ nremq()

int nremq ( ) const

Return number of remq() calls for current buffer.

Definition at line 411 of file RingBuffer.cc.

412{
413 return m_bufinfo->nremq;
414}

◆ numq()

int numq ( ) const

Returns number of entries/buffers in the RingBuffer.

Definition at line 368 of file RingBuffer.cc.

369{
370 return m_bufinfo->nbuf;
371}

◆ openSHM()

void openSHM ( int  nwords)

open shared memory

Definition at line 84 of file RingBuffer.cc.

85{
86 // 1. Open shared memory
87 unsigned int sizeBytes = nwords * sizeof(int);
88 const auto mode = IPC_CREAT | 0644;
89 m_shmid = shmget(m_shmkey, sizeBytes, mode);
90 if (m_shmid < 0) {
91 unsigned int maxSizeBytes = sizeBytes;
92 ifstream shmax("/proc/sys/kernel/shmmax");
93 if (shmax.good())
94 shmax >> maxSizeBytes;
95 shmax.close();
96
97 B2WARNING("RingBuffer: shmget(" << sizeBytes << ") failed, limiting to system maximum: " << maxSizeBytes);
98 sizeBytes = maxSizeBytes;
99 nwords = maxSizeBytes / sizeof(int);
100 m_shmid = shmget(m_shmkey, sizeBytes, mode);
101 if (m_shmid < 0) {
102 B2FATAL("RingBuffer: shmget(" << sizeBytes <<
103 ") failed. Most likely the system doesn't allow us to reserve the needed shared memory. Try 'echo 500000000 > /proc/sys/kernel/shmmax' as root to set a higher limit (500MB).");
104 }
105 }
106 m_shmadr = (int*) shmat(m_shmid, nullptr, 0);
107 if (m_shmadr == (int*) - 1) {
108 B2FATAL("RingBuffer: Attaching to shared memory segment via shmat() failed");
109 }
110
111 // 2. Open Semaphore
113 if (m_semid < 0) {
114 cleanup();
115 B2FATAL("Aborting execution because we couldn't create a semaphore (see previous error messages for details).");
116 }
117 SemaphoreLocker locker(m_semid); //prevent simultaneous initialization
118
119 // 3. Initialize control parameters
120 m_shmsize = nwords;
121 m_bufinfo = reinterpret_cast<RingBufInfo*>(m_shmadr);
122 m_buftop = m_shmadr + sizeof(struct RingBufInfo);
123 if (m_new) {
124 m_bufinfo->size = m_shmsize - sizeof(struct RingBufInfo);
126 m_bufinfo->wptr = 0;
127 m_bufinfo->prevwptr = 0;
128 m_bufinfo->rptr = 0;
129 m_bufinfo->nbuf = 0;
131 m_bufinfo->nattached = 1;
132 m_bufinfo->nbusy = 0;
133 m_bufinfo->errtype = 0;
135 m_bufinfo->ninsq = 0;
136 m_bufinfo->nremq = 0;
137 } else {
139
140 B2DEBUG(32, "[RingBuffer] check entries = " << m_bufinfo->nbuf);
141 B2DEBUG(32, "[RingBuffer] check size = " << m_bufinfo->size);
142 }
143
144 m_remq_counter = 0;
145 m_insq_counter = 0;
146
147 if (m_new) {
148 // Leave id of shm and semaphore in file name
149 m_semshmFileName = std::string(std::filesystem::temp_directory_path() / "SHM") + to_string(m_shmid) + "-SEM" + to_string(
150 m_semid) + "-UNNAMED";
151 int fd = open(m_semshmFileName.c_str(), O_CREAT | O_TRUNC | O_RDWR, 0644);
152 if (fd < 0) {
153 B2WARNING("RingBuffer ID file could not be created.");
154 } else {
155 close(fd);
156 }
157 }
158
159 B2DEBUG(35, "buftop = " << m_buftop << ", end = " << (m_buftop + m_bufinfo->size));
160}
int m_remq_counter
count remq() calls.
Definition: RingBuffer.h:132
static int create(key_t semkey)
Create a new semaphore and initialize it.
Internal metadata structure for RingBuffer.
Definition: RingBuffer.h:21
int semid
Semaphore ID.
Definition: RingBuffer.h:28

◆ remq()

int remq ( int *  buf)

Pick up a buffer from the RingBuffer.

Definition at line 308 of file RingBuffer.cc.

309{
310 SemaphoreLocker locker(m_semid);
311 if (m_bufinfo->nbuf < 0) {
312 throw std::runtime_error("[RingBuffer::remq ()] number of entries is negative: " + std::to_string(m_bufinfo->nbuf));
313 }
314 if (m_bufinfo->nbuf == 0) {
315 if (m_procIsBusy) {
316 m_bufinfo->nbusy--;
317 m_procIsBusy = false;
318 }
319 return 0;
320 }
321 int* r_ptr = m_buftop + m_bufinfo->rptr;
322 int nw = *r_ptr;
323 if (nw <= 0) {
324 printf("RingBuffer::remq : buffer size = %d, skipped\n", nw);
325 printf("RingBuffer::remq : entries = %d\n", m_bufinfo->nbuf);
326 if (m_procIsBusy) {
327 m_bufinfo->nbusy--;
328 m_procIsBusy = false;
329 }
330 return 0;
331 }
332 if (buf)
333 memcpy(buf, r_ptr + 2, nw * sizeof(int));
334 m_bufinfo->rptr = *(r_ptr + 1);
335 if (m_bufinfo->rptr == 0)
336 m_bufinfo->errtype = 4;
337 m_bufinfo->nbuf--;
338 m_bufinfo->nremq++;
340
341 if (not m_procIsBusy) {
342 m_bufinfo->nbusy++;
343 m_procIsBusy = true;
344 }
345 return nw;
346}

◆ remq_counter()

int remq_counter ( ) const

Return number of remq() calls.

Definition at line 421 of file RingBuffer.cc.

422{
423 return m_remq_counter;
424}

◆ shmid()

int shmid ( ) const

Return ID of the shared memory.

Definition at line 458 of file RingBuffer.cc.

459{
460 return m_shmid;
461}

◆ spyq()

int spyq ( int *  buf) const

Prefetch a buffer from the RingBuffer w/o removing it.

Definition at line 348 of file RingBuffer.cc.

349{
350 SemaphoreLocker locker(m_semid);
351 if (m_bufinfo->nbuf <= 0) {
352 return 0;
353 }
354 int* r_ptr = m_buftop + m_bufinfo->rptr;
355 int nw = *r_ptr;
356 if (nw <= 0) {
357 printf("RingBuffer::spyq : buffer size = %d, skipped\n", nw);
358 printf("RingBuffer::spyq : entries = %d\n", m_bufinfo->nbuf);
359 return 0;
360 }
361 // printf ( "remq : taking buf from %d(%d)\n", m_bufinfo->rptr, nw );
362 // Copy buffer without modifying management parameters.
363 memcpy(buf, r_ptr + 2, nw * sizeof(int));
364 // Exit
365 return nw;
366}

◆ tryClear()

int tryClear ( )

Clear the RingBuffer, if the semaphore isn't locked at the moment.

See SemaphoreLocker::isLocked() for details

Definition at line 450 of file RingBuffer.cc.

451{
453 return 0;
454
455 return clear();
456}
static bool isLocked(int semId)
Return true if the given semaphore is locked.

◆ txAttached()

void txAttached ( )

Increase the number of attached Tx counter.

Definition at line 373 of file RingBuffer.cc.

374{
375 SemaphoreLocker locker(m_semid);
376 if (m_bufinfo->numAttachedTx == -1) //first attach
378
380}

◆ txDetached()

void txDetached ( )

Decrease the number of attached Tx counter.

Definition at line 381 of file RingBuffer.cc.

382{
383 SemaphoreLocker locker(m_semid);
385 if (m_bufinfo->numAttachedTx < 0) {
387 }
388}

Member Data Documentation

◆ c_DefaultSize

const int c_DefaultSize = 15000000
static

Standard size of buffer, in integers (~60MB).

Needs to be large enough to contain any event, but adds to total memory use of basf2.

Definition at line 42 of file RingBuffer.h.

◆ m_bufinfo

struct RingBufInfo* m_bufinfo {nullptr}
private

structure to manage ring buffer.

Placed on top of the shared memory.

Definition at line 129 of file RingBuffer.h.

◆ m_buftop

int* m_buftop {nullptr}
private

Points to memory after the end of m_bufinfo.

Definition at line 130 of file RingBuffer.h.

◆ m_file

bool m_file {false}
private

True if m_pathfd needs to be closed.

Definition at line 111 of file RingBuffer.h.

◆ m_insq_counter

int m_insq_counter {0}
private

count insq() calls.

Definition at line 133 of file RingBuffer.h.

◆ m_new

bool m_new {true}
private

True if we created the ring buffer ourselves (and need to clean it).

Definition at line 110 of file RingBuffer.h.

◆ m_pathfd

int m_pathfd { -1}
private

Associated file descriptor.

Definition at line 113 of file RingBuffer.h.

◆ m_pathname

std::string m_pathname {""}
private

Path for identifying shared memory if named ring buffer is created.

Definition at line 112 of file RingBuffer.h.

◆ m_procIsBusy

bool m_procIsBusy {false}
private

Is this process currently processing events from this RingBuffer?

set during remq() with value depending on wether data was returned. Always false for a process that is only using insq().

Definition at line 124 of file RingBuffer.h.

◆ m_remq_counter

int m_remq_counter {0}
private

count remq() calls.

Definition at line 132 of file RingBuffer.h.

◆ m_semid

int m_semid { -1}
private

Semaphore ID.

Definition at line 131 of file RingBuffer.h.

◆ m_semkey

key_t m_semkey {IPC_PRIVATE}
private

Semaphore key, see semget(2).

Definition at line 115 of file RingBuffer.h.

◆ m_semshmFileName

std::string m_semshmFileName {""}
private

file path containing ids of shm and sema for private shared mem, used for easier cleanup if we fail to do things properly

Definition at line 117 of file RingBuffer.h.

◆ m_shmadr

int* m_shmadr {nullptr}
private

Address of attached shared memory segment.

(See shmat(2))

Definition at line 127 of file RingBuffer.h.

◆ m_shmid

int m_shmid { -1}
private

ID of shared memory segment.

(See shmget(2))

Definition at line 126 of file RingBuffer.h.

◆ m_shmkey

key_t m_shmkey {IPC_PRIVATE}
private

SHM key, see shmget(2).

Definition at line 114 of file RingBuffer.h.

◆ m_shmsize

int m_shmsize { -1}
private

Size of shared memory segment, in bytes.

Definition at line 128 of file RingBuffer.h.


The documentation for this class was generated from the following files: