Belle II Software prerelease-10-00-00a
RingBuffer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/RingBuffer.h>
10#include <framework/pcore/SemaphoreLocker.h>
11#include <framework/logging/Logger.h>
12
13#include <sys/ipc.h>
14#include <sys/shm.h>
15
16#include <cerrno>
17#include <cstdio>
18#include <unistd.h>
19#include <cstring>
20#include <fcntl.h>
21#include <cstdlib>
22#include <sys/sem.h>
23
24#include <filesystem>
25#include <fstream>
26#include <stdexcept>
27
28using namespace std;
29using namespace Belle2;
30
31// Constructor of Private Ringbuffer
33{
34 openSHM(size);
35 B2DEBUG(32, "RingBuffer initialization done");
36}
37
38// Constructor of Global Ringbuffer with name
39RingBuffer::RingBuffer(const std::string& name, unsigned int nwords)
40{
41 // 0. Determine shared memory type
42 if (name != "private") { // Global
43 m_file = true;
44 m_pathname = std::string(std::filesystem::temp_directory_path() / getenv("USER")) + "_RB_" + name;
45 m_pathfd = open(m_pathname.c_str(), O_CREAT | O_EXCL | O_RDWR, 0644);
46 if (m_pathfd > 0) { // a new shared memory file created
47 B2DEBUG(32, "[RingBuffer] Creating a ring buffer with key " << name);
48 m_new = true;
49 } else if (m_pathfd == -1 && errno == EEXIST) { // shm already there
50 B2DEBUG(32, "[RingBuffer] Attaching the ring buffer with key " << name);
51 m_new = false;
52 } else {
53 B2FATAL("RingBuffer: error opening shm file: " << m_pathname);
54 }
55 m_shmkey = ftok(m_pathname.c_str(), 1);
56 m_semkey = ftok(m_pathname.c_str(), 2);
57 } else { // Private
58 B2DEBUG(32, "[RingBuffer] Opening private ring buffer");
59 }
60
61 openSHM(nwords);
62
63 if (m_pathfd > 0) {
64 B2DEBUG(32, "First global RingBuffer creation: writing SHM info to file.");
65 char rbufinfo[256];
66 snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_shmid);
67 int is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
68 if (is < 0) perror("write");
69 snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_semid);
70 is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
71 if (is < 0) perror("write");
72 close(m_pathfd);
73 }
74
75
76 B2DEBUG(32, "RingBuffer initialization done with shm=" << m_shmid);
77}
78
83
84void RingBuffer::openSHM(int nwords)
85{
86 // 1. Open shared memory
87 unsigned int sizeBytes = nwords * sizeof(int);
88 const auto mode = IPC_CREAT | 0644;
89 m_shmid = shmget(m_shmkey, sizeBytes, mode);
90 if (m_shmid < 0) {
91 unsigned int maxSizeBytes = sizeBytes;
92 ifstream shmax("/proc/sys/kernel/shmmax");
93 if (shmax.good())
94 shmax >> maxSizeBytes;
95 shmax.close();
96
97 B2WARNING("RingBuffer: shmget(" << sizeBytes << ") failed, limiting to system maximum: " << maxSizeBytes);
98 sizeBytes = maxSizeBytes;
99 nwords = maxSizeBytes / sizeof(int);
100 m_shmid = shmget(m_shmkey, sizeBytes, mode);
101 if (m_shmid < 0) {
102 B2FATAL("RingBuffer: shmget(" << sizeBytes <<
103 ") failed. Most likely the system doesn't allow us to reserve the needed shared memory. Try 'echo 500000000 > /proc/sys/kernel/shmmax' as root to set a higher limit (500MB).");
104 }
105 }
106 m_shmadr = (int*) shmat(m_shmid, nullptr, 0);
107 if (m_shmadr == (int*) - 1) {
108 B2FATAL("RingBuffer: Attaching to shared memory segment via shmat() failed");
109 }
110
111 // 2. Open Semaphore
113 if (m_semid < 0) {
114 cleanup();
115 B2FATAL("Aborting execution because we couldn't create a semaphore (see previous error messages for details).");
116 }
117 SemaphoreLocker locker(m_semid); //prevent simultaneous initialization
118
119 // 3. Initialize control parameters
120 m_shmsize = nwords;
121 m_bufinfo = reinterpret_cast<RingBufInfo*>(m_shmadr);
122 m_buftop = m_shmadr + sizeof(struct RingBufInfo);
123 if (m_new) {
124 m_bufinfo->size = m_shmsize - sizeof(struct RingBufInfo);
125 m_bufinfo->remain = m_bufinfo->size;
126 m_bufinfo->wptr = 0;
127 m_bufinfo->prevwptr = 0;
128 m_bufinfo->rptr = 0;
129 m_bufinfo->nbuf = 0;
130 m_bufinfo->semid = m_semid;
131 m_bufinfo->nattached = 1;
132 m_bufinfo->nbusy = 0;
133 m_bufinfo->errtype = 0;
134 m_bufinfo->numAttachedTx = -1;
135 m_bufinfo->ninsq = 0;
136 m_bufinfo->nremq = 0;
137 } else {
138 m_bufinfo->nattached++;
139
140 B2DEBUG(32, "[RingBuffer] check entries = " << m_bufinfo->nbuf);
141 B2DEBUG(32, "[RingBuffer] check size = " << m_bufinfo->size);
142 }
143
144 m_remq_counter = 0;
145 m_insq_counter = 0;
146
147 if (m_new) {
148 // Leave id of shm and semaphore in file name
149 m_semshmFileName = std::string(std::filesystem::temp_directory_path() / "SHM") + to_string(m_shmid) + "-SEM" + to_string(
150 m_semid) + "-UNNAMED";
151 int fd = open(m_semshmFileName.c_str(), O_CREAT | O_TRUNC | O_RDWR, 0644);
152 if (fd < 0) {
153 B2WARNING("RingBuffer ID file could not be created.");
154 } else {
155 close(fd);
156 }
157 }
158
159 B2DEBUG(35, "buftop = " << m_buftop << ", end = " << (m_buftop + m_bufinfo->size));
160}
161
163{
164 if (m_procIsBusy) {
165 SemaphoreLocker locker(m_semid);
166 m_bufinfo->nbusy--;
167 m_procIsBusy = false;
168 }
169
170 shmdt(m_shmadr);
171 B2DEBUG(32, "RingBuffer: Cleaning up IPC");
172 if (m_new) {
173 shmctl(m_shmid, IPC_RMID, (struct shmid_ds*) nullptr);
175 if (m_file) {
176 unlink(m_pathname.c_str());
177 }
178 unlink(m_semshmFileName.c_str());
179 }
180}
181
183{
184 printf("bufsize=%d, remain=%d, wptr=%d, rptr=%d, nbuf=%d\n",
185 m_bufinfo->size, m_bufinfo->remain,
186 m_bufinfo->wptr, m_bufinfo->rptr, m_bufinfo->nbuf);
187}
188
189int RingBuffer::insq(const int* buf, int size, bool checkTx)
190{
191 if (size <= 0) {
192 B2FATAL("RingBuffer::insq() failed: invalid buffer size = " << size);
193 }
194 if (m_bufinfo->numAttachedTx == 0 and checkTx) {
195 //safe abort was requested
196 B2WARNING("Number of attached Tx is 0, so I will not go on with the processing.");
197 exit(0);
198 }
199 SemaphoreLocker locker(m_semid);
200 if (m_bufinfo->nbuf == 0) {
201 m_bufinfo->wptr = 0;
202 m_bufinfo->rptr = 0;
203 if (size > m_bufinfo->size + 2) {
204 throw std::runtime_error("[RingBuffer::insq ()] Inserted item (size: " + std::to_string(size) +
205 ") is larger than RingBuffer (size: " + std::to_string(m_bufinfo->size + 2) + ")!");
206 }
207 m_bufinfo->errtype = 0;
208 int* wptr = m_buftop + m_bufinfo->wptr;
209 *wptr = size;
210 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
211 memcpy(wptr + 2, buf, size * sizeof(int));
212 m_bufinfo->prevwptr = m_bufinfo->wptr;
213 m_bufinfo->wptr += (size + 2);
214 m_bufinfo->nbuf++;
215 m_bufinfo->ninsq++;
217 return size;
218 } else if (m_bufinfo->wptr > m_bufinfo->rptr) {
219 if (m_bufinfo->errtype != 4 &&
220 m_bufinfo->errtype != 3 && m_bufinfo->errtype != 0) {
221 B2ERROR("insq: Error in errtype 0; current=" << m_bufinfo->errtype);
222 return -1;
223 }
224 if (m_bufinfo->errtype == 3) {
225 // printf ( "---> errtype is 3, still remaining buffers\n" );
226 B2DEBUG(32, "[RingBuffer] errtype 3");
227 return -1;
228 } else if (m_bufinfo->errtype == 4) {
229 // printf ( "---> errtype returned to 0, wptr=%d, rptr=%d\n",
230 // m_bufinfo->wptr, m_bufinfo->rptr );
231 }
232 m_bufinfo->errtype = 0;
233 if (size + 2 < m_bufinfo->size - m_bufinfo->wptr) { // normal case
234 int* wptr = m_buftop + m_bufinfo->wptr;
235 *wptr = size;
236 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
237 memcpy(wptr + 2, buf, size * sizeof(int));
238 m_bufinfo->prevwptr = m_bufinfo->wptr;
239 m_bufinfo->wptr += (size + 2);
240 m_bufinfo->nbuf++;
241 m_bufinfo->ninsq++;
243 return size;
244 } else {
245 if (m_bufinfo->rptr >= size + 2) { // buffer full and wptr>rptr
246 // this should be dead code but I don't understand it well enough to delete it
247 if (m_bufinfo->errtype != 0) {
248 B2ERROR("insq: Error in errtype 1; current=" << m_bufinfo->errtype);
249 return -1;
250 }
251 m_bufinfo->errtype = 1;
252 int* wptr = m_buftop;
253 memcpy(wptr + 2, buf, size * sizeof(int));
254 *wptr = size;
255 *(wptr + 1) = size + 2;
256 m_bufinfo->wptr = *(wptr + 1);
257 int* prevptr = m_buftop + m_bufinfo->prevwptr;
258 *(prevptr + 1) = 0;
259 m_bufinfo->prevwptr = 0;
260 if (m_bufinfo->nbuf == 0) {
261 m_bufinfo->errtype = 4;
262 m_bufinfo->rptr = 0;
263 }
264 m_bufinfo->nbuf++;
265 // printf ( "insq: no more space, space below rptr; prev=%d, new=%d\n",
266 // m_bufinfo->prevwptr, m_bufinfo->wptr );
267 m_bufinfo->ninsq++;
269 return size;
270 } else {
271 // printf ( "insq: wptr>rptr, no more space, no space below rptr(%d), readbuf=%d\n",
272 // m_bufinfo->rptr, m_bufinfo->readbuf );
273 return -1;
274 }
275 }
276 } else { // wptr < rptr
277 if (m_bufinfo->wptr + size + 2 < m_bufinfo->rptr &&
278 size + 2 < m_bufinfo->size - m_bufinfo->rptr) {
279 if (m_bufinfo->errtype != 1 && m_bufinfo->errtype != 2 &&
280 m_bufinfo->errtype != 3) {
281 B2ERROR("insq: Error in errtype 2; current=" << m_bufinfo->errtype);
282 return -1;
283 }
284 m_bufinfo->errtype = 2;
285 int* wptr = m_buftop + m_bufinfo->wptr;
286 *wptr = size;
287 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
288 memcpy(wptr + 2, buf, size * sizeof(int));
289 m_bufinfo->prevwptr = m_bufinfo->wptr;
290 m_bufinfo->wptr += (size + 2);
291 m_bufinfo->nbuf++;
292 // printf ( "insq: wptr<rptr and enough space below rptr; curr=%d, next=%d, rptr=%d\n", m_bufinfo->prevwptr, m_bufinfo->wptr, m_bufinfo->rptr );
293 if (m_bufinfo->wptr > m_bufinfo->rptr) {
294 B2DEBUG(32, "next pointer will exceed rptr.....");
295 m_bufinfo->errtype = 3;
296 }
297 m_bufinfo->ninsq++;
299 return size;
300 } else {
301 // printf ( "insq: wptr<rptr; no more space below rptr(%d), wptr(%d)\n",
302 // m_bufinfo->rptr, m_bufinfo->wptr );
303 return -1;
304 }
305 }
306}
307
308int RingBuffer::remq(int* buf)
309{
310 SemaphoreLocker locker(m_semid);
311 if (m_bufinfo->nbuf < 0) {
312 throw std::runtime_error("[RingBuffer::remq ()] number of entries is negative: " + std::to_string(m_bufinfo->nbuf));
313 }
314 if (m_bufinfo->nbuf == 0) {
315 if (m_procIsBusy) {
316 m_bufinfo->nbusy--;
317 m_procIsBusy = false;
318 }
319 return 0;
320 }
321 int* r_ptr = m_buftop + m_bufinfo->rptr;
322 int nw = *r_ptr;
323 if (nw <= 0) {
324 printf("RingBuffer::remq : buffer size = %d, skipped\n", nw);
325 printf("RingBuffer::remq : entries = %d\n", m_bufinfo->nbuf);
326 if (m_procIsBusy) {
327 m_bufinfo->nbusy--;
328 m_procIsBusy = false;
329 }
330 return 0;
331 }
332 if (buf)
333 memcpy(buf, r_ptr + 2, nw * sizeof(int));
334 m_bufinfo->rptr = *(r_ptr + 1);
335 if (m_bufinfo->rptr == 0)
336 m_bufinfo->errtype = 4;
337 m_bufinfo->nbuf--;
338 m_bufinfo->nremq++;
340
341 if (not m_procIsBusy) {
342 m_bufinfo->nbusy++;
343 m_procIsBusy = true;
344 }
345 return nw;
346}
347
348int RingBuffer::spyq(int* buf) const
349{
350 SemaphoreLocker locker(m_semid);
351 if (m_bufinfo->nbuf <= 0) {
352 return 0;
353 }
354 int* r_ptr = m_buftop + m_bufinfo->rptr;
355 int nw = *r_ptr;
356 if (nw <= 0) {
357 printf("RingBuffer::spyq : buffer size = %d, skipped\n", nw);
358 printf("RingBuffer::spyq : entries = %d\n", m_bufinfo->nbuf);
359 return 0;
360 }
361 // printf ( "remq : taking buf from %d(%d)\n", m_bufinfo->rptr, nw );
362 // Copy buffer without modifying management parameters.
363 memcpy(buf, r_ptr + 2, nw * sizeof(int));
364 // Exit
365 return nw;
366}
367
369{
370 return m_bufinfo->nbuf;
371}
372
374{
375 SemaphoreLocker locker(m_semid);
376 if (m_bufinfo->numAttachedTx == -1) //first attach
377 m_bufinfo->numAttachedTx = 0;
378
379 m_bufinfo->numAttachedTx++;
380}
382{
383 SemaphoreLocker locker(m_semid);
384 m_bufinfo->numAttachedTx--;
385 if (m_bufinfo->numAttachedTx < 0) {
386 m_bufinfo->numAttachedTx = 0;
387 }
388}
390{
391 m_bufinfo->numAttachedTx = 0;
392 m_bufinfo->nbuf = 0;
393}
395{
396 SemaphoreLocker locker(m_semid);
397 //NOTE: numAttachedTx == -1 also means we should read data (i.e. initialization pending)
398 return (m_bufinfo->numAttachedTx == 0) and (m_bufinfo->nbuf <= 0);
399}
401{
402 SemaphoreLocker locker(m_semid);
403 return (m_bufinfo->nbusy == 0) and (m_bufinfo->nbuf == 0);
404}
405
407{
408 return m_bufinfo->ninsq;
409}
410
412{
413 return m_bufinfo->nremq;
414}
415
417{
418 return m_insq_counter;
419}
420
422{
423 return m_remq_counter;
424}
425
427{
428 SemaphoreLocker locker(m_semid);
429 // m_bufinfo->size = m_shmsize - sizeof ( struct RingBufInfo );
430 m_bufinfo->remain = m_bufinfo->size;
431 m_bufinfo->wptr = 0;
432 m_bufinfo->prevwptr = 0;
433 m_bufinfo->rptr = 0;
434 m_bufinfo->nbuf = 0;
435 m_bufinfo->ninsq = 0;
436 m_bufinfo->nremq = 0;
437
438 return 0;
439}
440
442{
443 int val = 1;
444 if (semctl(m_semid, 0, SETVAL, val) == -1) { //set 0th semaphore to semval
445 B2ERROR("Initializing semaphore with semctl() failed.");
446 }
447 clear();
448}
449
451{
453 return 0;
454
455 return clear();
456}
457
459{
460 return m_shmid;
461}
462
464{
465 SemaphoreLocker locker(m_semid);
466
467 // Dump control parameters
468 printf("***** Ring Buffer Information ***\n");
469 printf("path = %s\n", m_pathname.c_str());
470 printf("shmsize = %d\n", m_shmsize);
471 printf("[Buffer Info]\n");
472 printf("bufsize = %d\n", m_bufinfo->size);
473 printf("remain = %d\n", m_bufinfo->remain);
474 printf("wptr = %d\n", m_bufinfo->wptr);
475 printf("prevwptr = %d\n", m_bufinfo->prevwptr);
476 printf("rptr = %d\n", m_bufinfo->rptr);
477 printf("nbuf = %d\n", m_bufinfo->nbuf);
478 printf("nattached = %d\n", m_bufinfo->nattached);
479 printf("nbusy = %d\n", m_bufinfo->nbusy);
480 printf("errtype = %d\n", m_bufinfo->errtype);
481 printf("numAttachedTx = %d\n", m_bufinfo->numAttachedTx);
482 printf("ninsq = %d\n", m_bufinfo->ninsq);
483 printf("nremq = %d\n", m_bufinfo->ninsq);
484}
void dump_db()
Print some info on the RingBufInfo structure.
~RingBuffer()
Destructor.
Definition RingBuffer.cc:79
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
int m_shmsize
Size of shared memory segment, in bytes.
Definition RingBuffer.h:128
struct RingBufInfo * m_bufinfo
structure to manage ring buffer.
Definition RingBuffer.h:129
int tryClear()
Clear the RingBuffer, if the semaphore isn't locked at the moment.
bool m_file
True if m_pathfd needs to be closed.
Definition RingBuffer.h:111
int remq_counter() const
Return number of remq() calls.
int clear()
Clear the RingBuffer.
void txAttached()
Increase the number of attached Tx counter.
int spyq(int *buf) const
Prefetch a buffer from the RingBuffer w/o removing it.
void cleanup()
Function to detach and remove shared memory.
int m_remq_counter
count remq() calls.
Definition RingBuffer.h:132
void txDetached()
Decrease the number of attached Tx counter.
key_t m_shmkey
SHM key, see shmget(2).
Definition RingBuffer.h:114
int m_semid
Semaphore ID.
Definition RingBuffer.h:131
int shmid() const
Return ID of the shared memory.
void dumpInfo() const
Dump contents of RingBufInfo metadata.
int numq() const
Returns number of entries/buffers in the RingBuffer.
int m_shmid
ID of shared memory segment.
Definition RingBuffer.h:126
bool allRxWaiting() const
True if and only if buffer is empty and nbusy == 0.
void kill()
Cause termination of reading processes (if they use isDead()).
int remq(int *buf)
Pick up a buffer from the RingBuffer.
int m_pathfd
Associated file descriptor.
Definition RingBuffer.h:113
int insq_counter() const
Return number of insq() calls.
std::string m_pathname
Path for identifying shared memory if named ring buffer is created.
Definition RingBuffer.h:112
RingBuffer(int nwords=c_DefaultSize)
Constructor to create a new shared memory in private space.
Definition RingBuffer.cc:32
void openSHM(int nwords)
open shared memory
Definition RingBuffer.cc:84
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
bool m_procIsBusy
Is this process currently processing events from this RingBuffer?
Definition RingBuffer.h:124
int * m_buftop
Points to memory after the end of m_bufinfo.
Definition RingBuffer.h:130
int m_insq_counter
count insq() calls.
Definition RingBuffer.h:133
std::string m_semshmFileName
file path containing ids of shm and sema for private shared mem, used for easier cleanup if we fail t...
Definition RingBuffer.h:117
bool m_new
True if we created the ring buffer ourselves (and need to clean it).
Definition RingBuffer.h:110
int ninsq() const
Return number of insq() calls for current buffer.
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
key_t m_semkey
Semaphore key, see semget(2).
Definition RingBuffer.h:115
int nremq() const
Return number of remq() calls for current buffer.
int * m_shmadr
Address of attached shared memory segment.
Definition RingBuffer.h:127
Handles creation, locking and unlocking of System V semaphores.
static void destroy(int semId)
Destroy the given semaphore.
static int create(key_t semkey)
Create a new semaphore and initialize it.
static bool isLocked(int semId)
Return true if the given semaphore is locked.
Abstract base class for different kinds of events.
STL namespace.
Internal metadata structure for RingBuffer.
Definition RingBuffer.h:21