Belle II Software development
RingBuffer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/RingBuffer.h>
10#include <framework/pcore/SemaphoreLocker.h>
11#include <framework/logging/Logger.h>
12
13#include <sys/ipc.h>
14#include <sys/shm.h>
15
16#include <cerrno>
17#include <cstdio>
18#include <unistd.h>
19#include <cstring>
20#include <fcntl.h>
21#include <cstdlib>
22#include <sys/sem.h>
23
24#include <filesystem>
25#include <fstream>
26#include <stdexcept>
27
28using namespace std;
29using namespace Belle2;
30
31// Constructor of Private Ringbuffer
33{
34 openSHM(size);
35 B2DEBUG(32, "RingBuffer initialization done");
36}
37
38// Constructor of Global Ringbuffer with name
39RingBuffer::RingBuffer(const std::string& name, unsigned int nwords)
40{
41 // 0. Determine shared memory type
42 if (name != "private") { // Global
43 m_file = true;
44 m_pathname = std::string(std::filesystem::temp_directory_path() / getenv("USER")) + "_RB_" + name;
45 m_pathfd = open(m_pathname.c_str(), O_CREAT | O_EXCL | O_RDWR, 0644);
46 if (m_pathfd > 0) { // a new shared memory file created
47 B2DEBUG(32, "[RingBuffer] Creating a ring buffer with key " << name);
48 m_new = true;
49 } else if (m_pathfd == -1 && errno == EEXIST) { // shm already there
50 B2DEBUG(32, "[RingBuffer] Attaching the ring buffer with key " << name);
51 m_new = false;
52 } else {
53 B2FATAL("RingBuffer: error opening shm file: " << m_pathname);
54 }
55 m_shmkey = ftok(m_pathname.c_str(), 1);
56 m_semkey = ftok(m_pathname.c_str(), 2);
57 } else { // Private
58 B2DEBUG(32, "[RingBuffer] Opening private ring buffer");
59 }
60
61 openSHM(nwords);
62
63 if (m_pathfd > 0) {
64 B2DEBUG(32, "First global RingBuffer creation: writing SHM info to file.");
65 char rbufinfo[256];
66 snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_shmid);
67 int is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
68 if (is < 0) perror("write");
69 snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_semid);
70 is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
71 if (is < 0) perror("write");
72 close(m_pathfd);
73 }
74
75
76 B2DEBUG(32, "RingBuffer initialization done with shm=" << m_shmid);
77}
78
80{
81 cleanup();
82}
83
84void RingBuffer::openSHM(int nwords)
85{
86 // 1. Open shared memory
87 unsigned int sizeBytes = nwords * sizeof(int);
88 const auto mode = IPC_CREAT | 0644;
89 m_shmid = shmget(m_shmkey, sizeBytes, mode);
90 if (m_shmid < 0) {
91 unsigned int maxSizeBytes = sizeBytes;
92 ifstream shmax("/proc/sys/kernel/shmmax");
93 if (shmax.good())
94 shmax >> maxSizeBytes;
95 shmax.close();
96
97 B2WARNING("RingBuffer: shmget(" << sizeBytes << ") failed, limiting to system maximum: " << maxSizeBytes);
98 sizeBytes = maxSizeBytes;
99 nwords = maxSizeBytes / sizeof(int);
100 m_shmid = shmget(m_shmkey, sizeBytes, mode);
101 if (m_shmid < 0) {
102 B2FATAL("RingBuffer: shmget(" << sizeBytes <<
103 ") failed. Most likely the system doesn't allow us to reserve the needed shared memory. Try 'echo 500000000 > /proc/sys/kernel/shmmax' as root to set a higher limit (500MB).");
104 }
105 }
106 m_shmadr = (int*) shmat(m_shmid, nullptr, 0);
107 if (m_shmadr == (int*) - 1) {
108 B2FATAL("RingBuffer: Attaching to shared memory segment via shmat() failed");
109 }
110
111 // 2. Open Semaphore
113 if (m_semid < 0) {
114 cleanup();
115 B2FATAL("Aborting execution because we couldn't create a semaphore (see previous error messages for details).");
116 }
117 SemaphoreLocker locker(m_semid); //prevent simultaneous initialization
118
119 // 3. Initialize control parameters
120 m_shmsize = nwords;
121 m_bufinfo = reinterpret_cast<RingBufInfo*>(m_shmadr);
122 m_buftop = m_shmadr + sizeof(struct RingBufInfo);
123 if (m_new) {
124 m_bufinfo->size = m_shmsize - sizeof(struct RingBufInfo);
126 m_bufinfo->wptr = 0;
127 m_bufinfo->prevwptr = 0;
128 m_bufinfo->rptr = 0;
129 m_bufinfo->nbuf = 0;
131 m_bufinfo->nattached = 1;
132 m_bufinfo->nbusy = 0;
133 m_bufinfo->errtype = 0;
135 m_bufinfo->ninsq = 0;
136 m_bufinfo->nremq = 0;
137 } else {
139
140 B2DEBUG(32, "[RingBuffer] check entries = " << m_bufinfo->nbuf);
141 B2DEBUG(32, "[RingBuffer] check size = " << m_bufinfo->size);
142 }
143
144 m_remq_counter = 0;
145 m_insq_counter = 0;
146
147 if (m_new) {
148 // Leave id of shm and semaphore in file name
149 m_semshmFileName = std::string(std::filesystem::temp_directory_path() / "SHM") + to_string(m_shmid) + "-SEM" + to_string(
150 m_semid) + "-UNNAMED";
151 int fd = open(m_semshmFileName.c_str(), O_CREAT | O_TRUNC | O_RDWR, 0644);
152 if (fd < 0) {
153 B2WARNING("RingBuffer ID file could not be created.");
154 } else {
155 close(fd);
156 }
157 }
158
159 B2DEBUG(35, "buftop = " << m_buftop << ", end = " << (m_buftop + m_bufinfo->size));
160}
161
163{
164 if (m_procIsBusy) {
165 SemaphoreLocker locker(m_semid);
166 m_bufinfo->nbusy--;
167 m_procIsBusy = false;
168 }
169
170 shmdt(m_shmadr);
171 B2DEBUG(32, "RingBuffer: Cleaning up IPC");
172 if (m_new) {
173 shmctl(m_shmid, IPC_RMID, (struct shmid_ds*) nullptr);
175 if (m_file) {
176 unlink(m_pathname.c_str());
177 }
178 unlink(m_semshmFileName.c_str());
179 }
180}
181
183{
184 printf("bufsize=%d, remain=%d, wptr=%d, rptr=%d, nbuf=%d\n",
187}
188
189int RingBuffer::insq(const int* buf, int size, bool checkTx)
190{
191 if (size <= 0) {
192 B2FATAL("RingBuffer::insq() failed: invalid buffer size = " << size);
193 }
194 if (m_bufinfo->numAttachedTx == 0 and checkTx) {
195 //safe abort was requested
196 B2WARNING("Number of attached Tx is 0, so I will not go on with the processing.");
197 exit(0);
198 }
199 SemaphoreLocker locker(m_semid);
200 if (m_bufinfo->nbuf == 0) {
201 m_bufinfo->wptr = 0;
202 m_bufinfo->rptr = 0;
203 if (size > m_bufinfo->size + 2) {
204 throw std::runtime_error("[RingBuffer::insq ()] Inserted item (size: " + std::to_string(size) +
205 ") is larger than RingBuffer (size: " + std::to_string(m_bufinfo->size + 2) + ")!");
206 }
207 m_bufinfo->errtype = 0;
208 int* wptr = m_buftop + m_bufinfo->wptr;
209 *wptr = size;
210 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
211 memcpy(wptr + 2, buf, size * sizeof(int));
213 m_bufinfo->wptr += (size + 2);
214 m_bufinfo->nbuf++;
215 m_bufinfo->ninsq++;
217 return size;
218 } else if (m_bufinfo->wptr > m_bufinfo->rptr) {
219 if (m_bufinfo->errtype != 4 &&
220 m_bufinfo->errtype != 3 && m_bufinfo->errtype != 0) {
221 B2ERROR("insq: Error in errtype 0; current=" << m_bufinfo->errtype);
222 return -1;
223 }
224 if (m_bufinfo->errtype == 3) {
225 // printf ( "---> errtype is 3, still remaining buffers\n" );
226 B2DEBUG(32, "[RingBuffer] errtype 3");
227 return -1;
228 } else if (m_bufinfo->errtype == 4) {
229 // printf ( "---> errtype returned to 0, wptr=%d, rptr=%d\n",
230 // m_bufinfo->wptr, m_bufinfo->rptr );
231 }
232 m_bufinfo->errtype = 0;
233 if (size + 2 < m_bufinfo->size - m_bufinfo->wptr) { // normal case
234 int* wptr = m_buftop + m_bufinfo->wptr;
235 *wptr = size;
236 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
237 memcpy(wptr + 2, buf, size * sizeof(int));
239 m_bufinfo->wptr += (size + 2);
240 m_bufinfo->nbuf++;
241 m_bufinfo->ninsq++;
243 return size;
244 } else {
245 if (m_bufinfo->rptr >= size + 2) { // buffer full and wptr>rptr
246 // this should be dead code but I don't understand it well enough to delete it
247 if (m_bufinfo->errtype != 0) {
248 B2ERROR("insq: Error in errtype 1; current=" << m_bufinfo->errtype);
249 return -1;
250 }
251 m_bufinfo->errtype = 1;
252 int* wptr = m_buftop;
253 memcpy(wptr + 2, buf, size * sizeof(int));
254 *wptr = size;
255 *(wptr + 1) = size + 2;
256 m_bufinfo->wptr = *(wptr + 1);
257 int* prevptr = m_buftop + m_bufinfo->prevwptr;
258 *(prevptr + 1) = 0;
259 m_bufinfo->prevwptr = 0;
260 if (m_bufinfo->nbuf == 0) {
261 m_bufinfo->errtype = 4;
262 m_bufinfo->rptr = 0;
263 }
264 m_bufinfo->nbuf++;
265 // printf ( "insq: no more space, space below rptr; prev=%d, new=%d\n",
266 // m_bufinfo->prevwptr, m_bufinfo->wptr );
267 m_bufinfo->ninsq++;
269 return size;
270 } else {
271 // printf ( "insq: wptr>rptr, no more space, no space below rptr(%d), readbuf=%d\n",
272 // m_bufinfo->rptr, m_bufinfo->readbuf );
273 return -1;
274 }
275 }
276 } else { // wptr < rptr
277 if (m_bufinfo->wptr + size + 2 < m_bufinfo->rptr &&
278 size + 2 < m_bufinfo->size - m_bufinfo->rptr) {
279 if (m_bufinfo->errtype != 1 && m_bufinfo->errtype != 2 &&
280 m_bufinfo->errtype != 3) {
281 B2ERROR("insq: Error in errtype 2; current=" << m_bufinfo->errtype);
282 return -1;
283 }
284 m_bufinfo->errtype = 2;
285 int* wptr = m_buftop + m_bufinfo->wptr;
286 *wptr = size;
287 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
288 memcpy(wptr + 2, buf, size * sizeof(int));
290 m_bufinfo->wptr += (size + 2);
291 m_bufinfo->nbuf++;
292 // printf ( "insq: wptr<rptr and enough space below rptr; curr=%d, next=%d, rptr=%d\n", m_bufinfo->prevwptr, m_bufinfo->wptr, m_bufinfo->rptr );
293 if (m_bufinfo->wptr > m_bufinfo->rptr) {
294 B2DEBUG(32, "next pointer will exceed rptr.....");
295 m_bufinfo->errtype = 3;
296 }
297 m_bufinfo->ninsq++;
299 return size;
300 } else {
301 // printf ( "insq: wptr<rptr; no more space below rptr(%d), wptr(%d)\n",
302 // m_bufinfo->rptr, m_bufinfo->wptr );
303 return -1;
304 }
305 }
306}
307
308int RingBuffer::remq(int* buf)
309{
310 SemaphoreLocker locker(m_semid);
311 if (m_bufinfo->nbuf < 0) {
312 throw std::runtime_error("[RingBuffer::remq ()] number of entries is negative: " + std::to_string(m_bufinfo->nbuf));
313 }
314 if (m_bufinfo->nbuf == 0) {
315 if (m_procIsBusy) {
316 m_bufinfo->nbusy--;
317 m_procIsBusy = false;
318 }
319 return 0;
320 }
321 int* r_ptr = m_buftop + m_bufinfo->rptr;
322 int nw = *r_ptr;
323 if (nw <= 0) {
324 printf("RingBuffer::remq : buffer size = %d, skipped\n", nw);
325 printf("RingBuffer::remq : entries = %d\n", m_bufinfo->nbuf);
326 if (m_procIsBusy) {
327 m_bufinfo->nbusy--;
328 m_procIsBusy = false;
329 }
330 return 0;
331 }
332 if (buf)
333 memcpy(buf, r_ptr + 2, nw * sizeof(int));
334 m_bufinfo->rptr = *(r_ptr + 1);
335 if (m_bufinfo->rptr == 0)
336 m_bufinfo->errtype = 4;
337 m_bufinfo->nbuf--;
338 m_bufinfo->nremq++;
340
341 if (not m_procIsBusy) {
342 m_bufinfo->nbusy++;
343 m_procIsBusy = true;
344 }
345 return nw;
346}
347
348int RingBuffer::spyq(int* buf) const
349{
350 SemaphoreLocker locker(m_semid);
351 if (m_bufinfo->nbuf <= 0) {
352 return 0;
353 }
354 int* r_ptr = m_buftop + m_bufinfo->rptr;
355 int nw = *r_ptr;
356 if (nw <= 0) {
357 printf("RingBuffer::spyq : buffer size = %d, skipped\n", nw);
358 printf("RingBuffer::spyq : entries = %d\n", m_bufinfo->nbuf);
359 return 0;
360 }
361 // printf ( "remq : taking buf from %d(%d)\n", m_bufinfo->rptr, nw );
362 // Copy buffer without modifying management parameters.
363 memcpy(buf, r_ptr + 2, nw * sizeof(int));
364 // Exit
365 return nw;
366}
367
369{
370 return m_bufinfo->nbuf;
371}
372
374{
375 SemaphoreLocker locker(m_semid);
376 if (m_bufinfo->numAttachedTx == -1) //first attach
378
380}
382{
383 SemaphoreLocker locker(m_semid);
385 if (m_bufinfo->numAttachedTx < 0) {
387 }
388}
390{
392 m_bufinfo->nbuf = 0;
393}
395{
396 SemaphoreLocker locker(m_semid);
397 //NOTE: numAttachedTx == -1 also means we should read data (i.e. initialization pending)
398 return (m_bufinfo->numAttachedTx == 0) and (m_bufinfo->nbuf <= 0);
399}
401{
402 SemaphoreLocker locker(m_semid);
403 return (m_bufinfo->nbusy == 0) and (m_bufinfo->nbuf == 0);
404}
405
407{
408 return m_bufinfo->ninsq;
409}
410
412{
413 return m_bufinfo->nremq;
414}
415
417{
418 return m_insq_counter;
419}
420
422{
423 return m_remq_counter;
424}
425
427{
428 SemaphoreLocker locker(m_semid);
429 // m_bufinfo->size = m_shmsize - sizeof ( struct RingBufInfo );
431 m_bufinfo->wptr = 0;
432 m_bufinfo->prevwptr = 0;
433 m_bufinfo->rptr = 0;
434 m_bufinfo->nbuf = 0;
435 m_bufinfo->ninsq = 0;
436 m_bufinfo->nremq = 0;
437
438 return 0;
439}
440
442{
443 int val = 1;
444 if (semctl(m_semid, 0, SETVAL, val) == -1) { //set 0th semaphore to semval
445 B2ERROR("Initializing semaphore with semctl() failed.");
446 }
447 clear();
448}
449
451{
453 return 0;
454
455 return clear();
456}
457
459{
460 return m_shmid;
461}
462
464{
465 SemaphoreLocker locker(m_semid);
466
467 // Dump control parameters
468 printf("***** Ring Buffer Information ***\n");
469 printf("path = %s\n", m_pathname.c_str());
470 printf("shmsize = %d\n", m_shmsize);
471 printf("[Buffer Info]\n");
472 printf("bufsize = %d\n", m_bufinfo->size);
473 printf("remain = %d\n", m_bufinfo->remain);
474 printf("wptr = %d\n", m_bufinfo->wptr);
475 printf("prevwptr = %d\n", m_bufinfo->prevwptr);
476 printf("rptr = %d\n", m_bufinfo->rptr);
477 printf("nbuf = %d\n", m_bufinfo->nbuf);
478 printf("nattached = %d\n", m_bufinfo->nattached);
479 printf("nbusy = %d\n", m_bufinfo->nbusy);
480 printf("errtype = %d\n", m_bufinfo->errtype);
481 printf("numAttachedTx = %d\n", m_bufinfo->numAttachedTx);
482 printf("ninsq = %d\n", m_bufinfo->ninsq);
483 printf("nremq = %d\n", m_bufinfo->ninsq);
484}
void dump_db()
Print some info on the RingBufInfo structure.
Definition: RingBuffer.cc:182
~RingBuffer()
Destructor.
Definition: RingBuffer.cc:79
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
Definition: RingBuffer.cc:189
int m_shmsize
Size of shared memory segment, in bytes.
Definition: RingBuffer.h:128
struct RingBufInfo * m_bufinfo
structure to manage ring buffer.
Definition: RingBuffer.h:129
int tryClear()
Clear the RingBuffer, if the semaphore isn't locked at the moment.
Definition: RingBuffer.cc:450
bool m_file
True if m_pathfd needs to be closed.
Definition: RingBuffer.h:111
int remq_counter() const
Return number of remq() calls.
Definition: RingBuffer.cc:421
int clear()
Clear the RingBuffer.
Definition: RingBuffer.cc:426
void txAttached()
Increase the number of attached Tx counter.
Definition: RingBuffer.cc:373
int spyq(int *buf) const
Prefetch a buffer from the RingBuffer w/o removing it.
Definition: RingBuffer.cc:348
void cleanup()
Function to detach and remove shared memory.
Definition: RingBuffer.cc:162
int m_remq_counter
count remq() calls.
Definition: RingBuffer.h:132
void txDetached()
Decrease the number of attached Tx counter.
Definition: RingBuffer.cc:381
key_t m_shmkey
SHM key, see shmget(2).
Definition: RingBuffer.h:114
int m_semid
Semaphore ID.
Definition: RingBuffer.h:131
int shmid() const
Return ID of the shared memory.
Definition: RingBuffer.cc:458
void dumpInfo() const
Dump contents of RingBufInfo metadata.
Definition: RingBuffer.cc:463
int numq() const
Returns number of entries/buffers in the RingBuffer.
Definition: RingBuffer.cc:368
int m_shmid
ID of shared memory segment.
Definition: RingBuffer.h:126
bool allRxWaiting() const
True if and only if buffer is empty and nbusy == 0.
Definition: RingBuffer.cc:400
void kill()
Cause termination of reading processes (if they use isDead()).
Definition: RingBuffer.cc:389
int remq(int *buf)
Pick up a buffer from the RingBuffer.
Definition: RingBuffer.cc:308
int m_pathfd
Associated file descriptor.
Definition: RingBuffer.h:113
int insq_counter() const
Return number of insq() calls.
Definition: RingBuffer.cc:416
std::string m_pathname
Path for identifying shared memory if named ring buffer is created.
Definition: RingBuffer.h:112
RingBuffer(int nwords=c_DefaultSize)
Constructor to create a new shared memory in private space.
Definition: RingBuffer.cc:32
void openSHM(int nwords)
open shared memory
Definition: RingBuffer.cc:84
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
Definition: RingBuffer.cc:394
bool m_procIsBusy
Is this process currently processing events from this RingBuffer?
Definition: RingBuffer.h:124
int * m_buftop
Points to memory after the end of m_bufinfo.
Definition: RingBuffer.h:130
int m_insq_counter
count insq() calls.
Definition: RingBuffer.h:133
std::string m_semshmFileName
file path containing ids of shm and sema for private shared mem, used for easier cleanup if we fail t...
Definition: RingBuffer.h:117
bool m_new
True if we created the ring buffer ourselves (and need to clean it).
Definition: RingBuffer.h:110
int ninsq() const
Return number of insq() calls for current buffer.
Definition: RingBuffer.cc:406
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Definition: RingBuffer.cc:441
key_t m_semkey
Semaphore key, see semget(2).
Definition: RingBuffer.h:115
int nremq() const
Return number of remq() calls for current buffer.
Definition: RingBuffer.cc:411
int * m_shmadr
Address of attached shared memory segment.
Definition: RingBuffer.h:127
Handles creation, locking and unlocking of System V semaphores.
static void destroy(int semId)
Destroy the given semaphore.
static int create(key_t semkey)
Create a new semaphore and initialize it.
static bool isLocked(int semId)
Return true if the given semaphore is locked.
Abstract base class for different kinds of events.
STL namespace.
Internal metadata structure for RingBuffer.
Definition: RingBuffer.h:21
int numAttachedTx
number of attached sending processes.
Definition: RingBuffer.h:33
int errtype
Error state? 0: Normal, 1: buffer full and wptr>rptr, others are complicated.
Definition: RingBuffer.h:32
int wptr
Pointer for writing entries.
Definition: RingBuffer.h:24
int size
ring buffer size (integers), minus this header.
Definition: RingBuffer.h:22
int semid
Semaphore ID.
Definition: RingBuffer.h:28
int nattached
Number of RingBuffer instances currently attached to this buffer.
Definition: RingBuffer.h:29
int nbusy
Number of attached reading processes currently processing events.
Definition: RingBuffer.h:30
int nremq
Count remq() calls for this buffer.
Definition: RingBuffer.h:35
int nbuf
Number of entries in ring buffer.
Definition: RingBuffer.h:27
int rptr
Pointer for reading entries.
Definition: RingBuffer.h:26
int remain
Unsure, always equal to size.
Definition: RingBuffer.h:23
int prevwptr
Previous state of wptr (for error recovery).
Definition: RingBuffer.h:25
int ninsq
Count insq() calls for this buffer.
Definition: RingBuffer.h:34