Belle II Software light-2406-ragdoll
RingBuffer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/RingBuffer.h>
10#include <framework/pcore/SemaphoreLocker.h>
11#include <framework/logging/Logger.h>
12
13#include <sys/ipc.h>
14#include <sys/shm.h>
15
16#include <cerrno>
17#include <cstdio>
18#include <unistd.h>
19#include <cstring>
20#include <fcntl.h>
21#include <cstdlib>
22#include <sys/sem.h>
23
24#include <filesystem>
25#include <fstream>
26#include <stdexcept>
27
28using namespace std;
29using namespace Belle2;
30
31// Constructor of Private Ringbuffer
33{
34 openSHM(size);
35 B2DEBUG(32, "RingBuffer initialization done");
36}
37
38// Constructor of Global Ringbuffer with name
39RingBuffer::RingBuffer(const std::string& name, unsigned int nwords)
40{
41 // 0. Determine shared memory type
42 if (name != "private") { // Global
43 m_file = true;
44 m_pathname = std::string(std::filesystem::temp_directory_path() / getenv("USER")) + "_RB_" + name;
45 m_pathfd = open(m_pathname.c_str(), O_CREAT | O_EXCL | O_RDWR, 0644);
46 if (m_pathfd > 0) { // a new shared memory file created
47 B2DEBUG(32, "[RingBuffer] Creating a ring buffer with key " << name);
48 m_new = true;
49 } else if (m_pathfd == -1 && errno == EEXIST) { // shm already there
50 B2DEBUG(32, "[RingBuffer] Attaching the ring buffer with key " << name);
51 m_new = false;
52 } else {
53 B2FATAL("RingBuffer: error opening shm file: " << m_pathname);
54 }
55 m_shmkey = ftok(m_pathname.c_str(), 1);
56 m_semkey = ftok(m_pathname.c_str(), 2);
57 } else { // Private
58 B2DEBUG(32, "[RingBuffer] Opening private ring buffer");
59 }
60
61 openSHM(nwords);
62
63 if (m_pathfd > 0) {
64 B2DEBUG(32, "First global RingBuffer creation: writing SHM info to file.");
65 char rbufinfo[256];
66 snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_shmid);
67 int is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
68 if (is < 0) perror("write");
69 snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_semid);
70 is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
71 if (is < 0) perror("write");
72 close(m_pathfd);
73 }
74
75
76 B2DEBUG(32, "RingBuffer initialization done with shm=" << m_shmid);
77}
78
80{
81 cleanup();
82}
83
84void RingBuffer::openSHM(int nwords)
85{
86 // 1. Open shared memory
87 unsigned int sizeBytes = nwords * sizeof(int);
88 const auto mode = IPC_CREAT | 0644;
89 m_shmid = shmget(m_shmkey, sizeBytes, mode);
90 if (m_shmid < 0) {
91 unsigned int maxSizeBytes = sizeBytes;
92 ifstream shmax("/proc/sys/kernel/shmmax");
93 if (shmax.good())
94 shmax >> maxSizeBytes;
95 shmax.close();
96
97 B2WARNING("RingBuffer: shmget(" << sizeBytes << ") failed, limiting to system maximum: " << maxSizeBytes);
98 sizeBytes = maxSizeBytes;
99 nwords = maxSizeBytes / sizeof(int);
100 m_shmid = shmget(m_shmkey, sizeBytes, mode);
101 if (m_shmid < 0) {
102 B2FATAL("RingBuffer: shmget(" << sizeBytes <<
103 ") failed. Most likely the system doesn't allow us to reserve the needed shared memory. Try 'echo 500000000 > /proc/sys/kernel/shmmax' as root to set a higher limit (500MB).");
104 }
105 }
106 m_shmadr = (int*) shmat(m_shmid, nullptr, 0);
107 if (m_shmadr == (int*) - 1) {
108 B2FATAL("RingBuffer: Attaching to shared memory segment via shmat() failed");
109 }
110
111 // 2. Open Semaphore
113 if (m_semid < 0) {
114 cleanup();
115 B2FATAL("Aborting execution because we couldn't create a semaphore (see previous error messages for details).");
116 }
117 SemaphoreLocker locker(m_semid); //prevent simultaneous initialization
118
119 // 3. Initialize control parameters
120 m_shmsize = nwords;
121 m_bufinfo = reinterpret_cast<RingBufInfo*>(m_shmadr);
122 m_buftop = m_shmadr + sizeof(struct RingBufInfo);
123 if (m_new) {
124 m_bufinfo->size = m_shmsize - sizeof(struct RingBufInfo);
126 m_bufinfo->wptr = 0;
127 m_bufinfo->prevwptr = 0;
128 m_bufinfo->rptr = 0;
129 m_bufinfo->nbuf = 0;
131 m_bufinfo->nattached = 1;
132 m_bufinfo->nbusy = 0;
133 m_bufinfo->errtype = 0;
135 m_bufinfo->ninsq = 0;
136 m_bufinfo->nremq = 0;
137 } else {
139
140 B2DEBUG(32, "[RingBuffer] check entries = " << m_bufinfo->nbuf);
141 B2DEBUG(32, "[RingBuffer] check size = " << m_bufinfo->size);
142 }
143
144 m_remq_counter = 0;
145 m_insq_counter = 0;
146
147 if (m_new) {
148 // Leave id of shm and semaphore in file name
149 m_semshmFileName = std::string(std::filesystem::temp_directory_path() / "SHM") + to_string(m_shmid) + "-SEM" + to_string(
150 m_semid) + "-UNNAMED";
151 int fd = open(m_semshmFileName.c_str(), O_CREAT | O_TRUNC | O_RDWR, 0644);
152 if (fd < 0) {
153 B2WARNING("RingBuffer ID file could not be created.");
154 } else {
155 close(fd);
156 }
157 }
158
159 B2DEBUG(35, "buftop = " << m_buftop << ", end = " << (m_buftop + m_bufinfo->size));
160}
161
163{
164 if (m_procIsBusy) {
165 SemaphoreLocker locker(m_semid);
166 m_bufinfo->nbusy--;
167 m_procIsBusy = false;
168 }
169
170 shmdt(m_shmadr);
171 B2DEBUG(32, "RingBuffer: Cleaning up IPC");
172 if (m_new) {
173 shmctl(m_shmid, IPC_RMID, (struct shmid_ds*) nullptr);
175 if (m_file) {
176 unlink(m_pathname.c_str());
177 }
178 unlink(m_semshmFileName.c_str());
179 }
180}
181
183{
184 printf("bufsize=%d, remain=%d, wptr=%d, rptr=%d, nbuf=%d\n",
187}
188
189int RingBuffer::insq(const int* buf, int size, bool checkTx)
190{
191 if (size <= 0) {
192 B2FATAL("RingBuffer::insq() failed: invalid buffer size = " << size);
193 }
194 if (m_bufinfo->numAttachedTx == 0 and checkTx) {
195 //safe abort was requested
196 B2WARNING("Number of attached Tx is 0, so I will not go on with the processing.");
197 exit(0);
198 }
199 SemaphoreLocker locker(m_semid);
200 if (m_bufinfo->nbuf == 0) {
201 m_bufinfo->wptr = 0;
202 m_bufinfo->rptr = 0;
203 if (size > m_bufinfo->size + 2) {
204 throw std::runtime_error("[RingBuffer::insq ()] Inserted item (size: " + std::to_string(size) +
205 ") is larger than RingBuffer (size: " + std::to_string(m_bufinfo->size + 2) + ")!");
206 }
207 m_bufinfo->errtype = 0;
208 int* wptr = m_buftop + m_bufinfo->wptr;
209 *wptr = size;
210 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
211 memcpy(wptr + 2, buf, size * sizeof(int));
213 m_bufinfo->wptr += (size + 2);
214 m_bufinfo->nbuf++;
215 m_bufinfo->ninsq++;
217 return size;
218 } else if (m_bufinfo->wptr > m_bufinfo->rptr) {
219 if (m_bufinfo->errtype != 4 &&
220 m_bufinfo->errtype != 3 && m_bufinfo->errtype != 0) {
221 B2ERROR("insq: Error in errtype 0; current=" << m_bufinfo->errtype);
222 return -1;
223 }
224 if (m_bufinfo->errtype == 3) {
225 // printf ( "---> errtype is 3, still remaining buffers\n" );
226 B2DEBUG(32, "[RingBuffer] errtype 3");
227 return -1;
228 } else if (m_bufinfo->errtype == 4) {
229 // printf ( "---> errtype returned to 0, wptr=%d, rptr=%d\n",
230 // m_bufinfo->wptr, m_bufinfo->rptr );
231 }
232 m_bufinfo->errtype = 0;
233 if (size + 2 < m_bufinfo->size - m_bufinfo->wptr) { // normal case
234 int* wptr = m_buftop + m_bufinfo->wptr;
235 *wptr = size;
236 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
237 memcpy(wptr + 2, buf, size * sizeof(int));
239 m_bufinfo->wptr += (size + 2);
240 m_bufinfo->nbuf++;
241 m_bufinfo->ninsq++;
243 return size;
244 } else {
245 if (m_bufinfo->rptr >= size + 2) { // buffer full and wptr>rptr
246 // this should be dead code but I don't understand it well enough to delete it
247 if (m_bufinfo->errtype != 0) {
248 B2ERROR("insq: Error in errtype 1; current=" << m_bufinfo->errtype);
249 return -1;
250 }
251 m_bufinfo->errtype = 1;
252 int* wptr = m_buftop;
253 memcpy(wptr + 2, buf, size * sizeof(int));
254 *wptr = size;
255 *(wptr + 1) = size + 2;
256 m_bufinfo->wptr = *(wptr + 1);
257 int* prevptr = m_buftop + m_bufinfo->prevwptr;
258 *(prevptr + 1) = 0;
259 m_bufinfo->prevwptr = 0;
260 if (m_bufinfo->nbuf == 0) {
261 m_bufinfo->errtype = 4;
262 m_bufinfo->rptr = 0;
263 }
264 m_bufinfo->nbuf++;
265 // printf ( "insq: no more space, space below rptr; prev=%d, new=%d\n",
266 // m_bufinfo->prevwptr, m_bufinfo->wptr );
267 m_bufinfo->ninsq++;
269 return size;
270 } else {
271 // printf ( "insq: wptr>rptr, no more space, no space below rptr(%d), readbuf=%d\n",
272 // m_bufinfo->rptr, m_bufinfo->readbuf );
273 return -1;
274 }
275 }
276 } else { // wptr < rptr
277 if (m_bufinfo->wptr + size + 2 < m_bufinfo->rptr &&
278 size + 2 < m_bufinfo->size - m_bufinfo->rptr) {
279 if (m_bufinfo->errtype != 1 && m_bufinfo->errtype != 2 &&
280 m_bufinfo->errtype != 3) {
281 B2ERROR("insq: Error in errtype 2; current=" << m_bufinfo->errtype);
282 return -1;
283 }
284 m_bufinfo->errtype = 2;
285 int* wptr = m_buftop + m_bufinfo->wptr;
286 *wptr = size;
287 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
288 memcpy(wptr + 2, buf, size * sizeof(int));
290 m_bufinfo->wptr += (size + 2);
291 m_bufinfo->nbuf++;
292 // printf ( "insq: wptr<rptr and enough space below rptr; curr=%d, next=%d, rptr=%d\n", m_bufinfo->prevwptr, m_bufinfo->wptr, m_bufinfo->rptr );
293 if (m_bufinfo->wptr > m_bufinfo->rptr) {
294 B2DEBUG(32, "next pointer will exceed rptr.....");
295 m_bufinfo->errtype = 3;
296 }
297 m_bufinfo->ninsq++;
299 return size;
300 } else {
301 // printf ( "insq: wptr<rptr; no more space below rptr(%d), wptr(%d)\n",
302 // m_bufinfo->rptr, m_bufinfo->wptr );
303 return -1;
304 }
305 }
306}
307
308int RingBuffer::remq(int* buf)
309{
310 SemaphoreLocker locker(m_semid);
311 if (m_bufinfo->nbuf < 0) {
312 throw std::runtime_error("[RingBuffer::remq ()] number of entries is negative: " + std::to_string(m_bufinfo->nbuf));
313 }
314 if (m_bufinfo->nbuf == 0) {
315 if (m_procIsBusy) {
316 m_bufinfo->nbusy--;
317 m_procIsBusy = false;
318 }
319 return 0;
320 }
321 int* r_ptr = m_buftop + m_bufinfo->rptr;
322 int nw = *r_ptr;
323 if (nw <= 0) {
324 printf("RingBuffer::remq : buffer size = %d, skipped\n", nw);
325 printf("RingBuffer::remq : entries = %d\n", m_bufinfo->nbuf);
326 if (m_procIsBusy) {
327 m_bufinfo->nbusy--;
328 m_procIsBusy = false;
329 }
330 return 0;
331 }
332 if (buf)
333 memcpy(buf, r_ptr + 2, nw * sizeof(int));
334 m_bufinfo->rptr = *(r_ptr + 1);
335 if (m_bufinfo->rptr == 0)
336 m_bufinfo->errtype = 4;
337 m_bufinfo->nbuf--;
338 m_bufinfo->nremq++;
340
341 if (not m_procIsBusy) {
342 m_bufinfo->nbusy++;
343 m_procIsBusy = true;
344 }
345 return nw;
346}
347
348int RingBuffer::spyq(int* buf) const
349{
350 SemaphoreLocker locker(m_semid);
351 if (m_bufinfo->nbuf <= 0) {
352 return 0;
353 }
354 int* r_ptr = m_buftop + m_bufinfo->rptr;
355 int nw = *r_ptr;
356 if (nw <= 0) {
357 printf("RingBuffer::spyq : buffer size = %d, skipped\n", nw);
358 printf("RingBuffer::spyq : entries = %d\n", m_bufinfo->nbuf);
359 return 0;
360 }
361 // printf ( "remq : taking buf from %d(%d)\n", m_bufinfo->rptr, nw );
362 // Copy buffer without modifying management parameters.
363 memcpy(buf, r_ptr + 2, nw * sizeof(int));
364 // Exit
365 return nw;
366}
367
369{
370 return m_bufinfo->nbuf;
371}
372
374{
375 SemaphoreLocker locker(m_semid);
376 if (m_bufinfo->numAttachedTx == -1) //first attach
378
380}
382{
383 SemaphoreLocker locker(m_semid);
385 if (m_bufinfo->numAttachedTx < 0) {
387 }
388}
390{
392 m_bufinfo->nbuf = 0;
393}
395{
396 SemaphoreLocker locker(m_semid);
397 //NOTE: numAttachedTx == -1 also means we should read data (i.e. initialization pending)
398 return (m_bufinfo->numAttachedTx == 0) and (m_bufinfo->nbuf <= 0);
399}
401{
402 SemaphoreLocker locker(m_semid);
403 return (m_bufinfo->nbusy == 0) and (m_bufinfo->nbuf == 0);
404}
405
407{
408 return m_bufinfo->ninsq;
409}
410
412{
413 return m_bufinfo->nremq;
414}
415
417{
418 return m_insq_counter;
419}
420
422{
423 return m_remq_counter;
424}
425
427{
428 SemaphoreLocker locker(m_semid);
429 // m_bufinfo->size = m_shmsize - sizeof ( struct RingBufInfo );
431 m_bufinfo->wptr = 0;
432 m_bufinfo->prevwptr = 0;
433 m_bufinfo->rptr = 0;
434 m_bufinfo->nbuf = 0;
435 m_bufinfo->ninsq = 0;
436 m_bufinfo->nremq = 0;
437
438 return 0;
439}
440
442{
443 int val = 1;
444 if (semctl(m_semid, 0, SETVAL, val) == -1) { //set 0th semaphore to semval
445 B2ERROR("Initializing semaphore with semctl() failed.");
446 }
447 clear();
448}
449
451{
453 return 0;
454
455 return clear();
456}
457
459{
460 return m_shmid;
461}
462
464{
465 SemaphoreLocker locker(m_semid);
466
467 // Dump control parameters
468 printf("***** Ring Buffer Information ***\n");
469 printf("path = %s\n", m_pathname.c_str());
470 printf("shmsize = %d\n", m_shmsize);
471 printf("[Buffer Info]\n");
472 printf("bufsize = %d\n", m_bufinfo->size);
473 printf("remain = %d\n", m_bufinfo->remain);
474 printf("wptr = %d\n", m_bufinfo->wptr);
475 printf("prevwptr = %d\n", m_bufinfo->prevwptr);
476 printf("rptr = %d\n", m_bufinfo->rptr);
477 printf("nbuf = %d\n", m_bufinfo->nbuf);
478 printf("nattached = %d\n", m_bufinfo->nattached);
479 printf("nbusy = %d\n", m_bufinfo->nbusy);
480 printf("errtype = %d\n", m_bufinfo->errtype);
481 printf("numAttachedTx = %d\n", m_bufinfo->numAttachedTx);
482 printf("ninsq = %d\n", m_bufinfo->ninsq);
483 printf("nremq = %d\n", m_bufinfo->ninsq);
484}
void dump_db()
Print some info on the RingBufInfo structure.
Definition: RingBuffer.cc:182
~RingBuffer()
Destructor.
Definition: RingBuffer.cc:79
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
Definition: RingBuffer.cc:189
int m_shmsize
Size of shared memory segment, in bytes.
Definition: RingBuffer.h:128
struct RingBufInfo * m_bufinfo
structure to manage ring buffer.
Definition: RingBuffer.h:129
int tryClear()
Clear the RingBuffer, if the semaphore isn't locked at the moment.
Definition: RingBuffer.cc:450
bool m_file
True if m_pathfd needs to be closed.
Definition: RingBuffer.h:111
int remq_counter() const
Return number of remq() calls.
Definition: RingBuffer.cc:421
int clear()
Clear the RingBuffer.
Definition: RingBuffer.cc:426
void txAttached()
Increase the number of attached Tx counter.
Definition: RingBuffer.cc:373
int spyq(int *buf) const
Prefetch a buffer from the RingBuffer w/o removing it.
Definition: RingBuffer.cc:348
void cleanup()
Function to detach and remove shared memory.
Definition: RingBuffer.cc:162
int m_remq_counter
count remq() calls.
Definition: RingBuffer.h:132
void txDetached()
Decrease the number of attached Tx counter.
Definition: RingBuffer.cc:381
key_t m_shmkey
SHM key, see shmget(2).
Definition: RingBuffer.h:114
int m_semid
Semaphore ID.
Definition: RingBuffer.h:131
int shmid() const
Return ID of the shared memory.
Definition: RingBuffer.cc:458
void dumpInfo() const
Dump contents of RingBufInfo metadata.
Definition: RingBuffer.cc:463
int numq() const
Returns number of entries/buffers in the RingBuffer.
Definition: RingBuffer.cc:368
int m_shmid
ID of shared memory segment.
Definition: RingBuffer.h:126
bool allRxWaiting() const
True if and only if buffer is empty and nbusy == 0.
Definition: RingBuffer.cc:400
void kill()
Cause termination of reading processes (if they use isDead()).
Definition: RingBuffer.cc:389
int remq(int *buf)
Pick up a buffer from the RingBuffer.
Definition: RingBuffer.cc:308
int m_pathfd
Associated file descriptor.
Definition: RingBuffer.h:113
int insq_counter() const
Return number of insq() calls.
Definition: RingBuffer.cc:416
std::string m_pathname
Path for identifying shared memory if named ring buffer is created.
Definition: RingBuffer.h:112
RingBuffer(int nwords=c_DefaultSize)
Constructor to create a new shared memory in private space.
Definition: RingBuffer.cc:32
void openSHM(int nwords)
open shared memory
Definition: RingBuffer.cc:84
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
Definition: RingBuffer.cc:394
bool m_procIsBusy
Is this process currently processing events from this RingBuffer?
Definition: RingBuffer.h:124
int * m_buftop
Points to memory after the end of m_bufinfo.
Definition: RingBuffer.h:130
int m_insq_counter
count insq() calls.
Definition: RingBuffer.h:133
std::string m_semshmFileName
file path containing ids of shm and sema for private shared mem, used for easier cleanup if we fail t...
Definition: RingBuffer.h:117
bool m_new
True if we created the ring buffer ourselves (and need to clean it).
Definition: RingBuffer.h:110
int ninsq() const
Return number of insq() calls for current buffer.
Definition: RingBuffer.cc:406
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
Definition: RingBuffer.cc:441
key_t m_semkey
Semaphore key, see semget(2).
Definition: RingBuffer.h:115
int nremq() const
Return number of remq() calls for current buffer.
Definition: RingBuffer.cc:411
int * m_shmadr
Address of attached shared memory segment.
Definition: RingBuffer.h:127
Handles creation, locking and unlocking of System V semaphores.
static void destroy(int semId)
Destroy the given semaphore.
static int create(key_t semkey)
Create a new semaphore and initialize it.
static bool isLocked(int semId)
Return true if the given semaphore is locked.
Abstract base class for different kinds of events.
Definition: ClusterUtils.h:24
STL namespace.
Internal metadata structure for RingBuffer.
Definition: RingBuffer.h:21
int numAttachedTx
number of attached sending processes.
Definition: RingBuffer.h:33
int errtype
Error state? 0: Normal, 1: buffer full and wptr>rptr, others are complicated.
Definition: RingBuffer.h:32
int wptr
Pointer for writing entries.
Definition: RingBuffer.h:24
int size
ring buffer size (integers), minus this header.
Definition: RingBuffer.h:22
int semid
Semaphore ID.
Definition: RingBuffer.h:28
int nattached
Number of RingBuffer instances currently attached to this buffer.
Definition: RingBuffer.h:29
int nbusy
Number of attached reading processes currently processing events.
Definition: RingBuffer.h:30
int nremq
Count remq() calls for this buffer.
Definition: RingBuffer.h:35
int nbuf
Number of entries in ring buffer.
Definition: RingBuffer.h:27
int rptr
Pointer for reading entries.
Definition: RingBuffer.h:26
int remain
Unsure, always equal to size.
Definition: RingBuffer.h:23
int prevwptr
Previous state of wptr (for error recovery).
Definition: RingBuffer.h:25
int ninsq
Count insq() calls for this buffer.
Definition: RingBuffer.h:34