9#include <framework/pcore/RingBuffer.h>
10#include <framework/pcore/SemaphoreLocker.h>
11#include <framework/logging/Logger.h>
35 B2DEBUG(32,
"RingBuffer initialization done");
42 if (name !=
"private") {
44 m_pathname = std::string(std::filesystem::temp_directory_path() / getenv(
"USER")) +
"_RB_" + name;
47 B2DEBUG(32,
"[RingBuffer] Creating a ring buffer with key " << name);
49 }
else if (
m_pathfd == -1 && errno == EEXIST) {
50 B2DEBUG(32,
"[RingBuffer] Attaching the ring buffer with key " << name);
53 B2FATAL(
"RingBuffer: error opening shm file: " <<
m_pathname);
58 B2DEBUG(32,
"[RingBuffer] Opening private ring buffer");
64 B2DEBUG(32,
"First global RingBuffer creation: writing SHM info to file.");
66 snprintf(rbufinfo,
sizeof(rbufinfo),
"%d\n",
m_shmid);
67 int is = write(
m_pathfd, rbufinfo, strlen(rbufinfo));
68 if (is < 0) perror(
"write");
69 snprintf(rbufinfo,
sizeof(rbufinfo),
"%d\n",
m_semid);
70 is = write(
m_pathfd, rbufinfo, strlen(rbufinfo));
71 if (is < 0) perror(
"write");
76 B2DEBUG(32,
"RingBuffer initialization done with shm=" <<
m_shmid);
87 unsigned int sizeBytes = nwords *
sizeof(int);
88 const auto mode = IPC_CREAT | 0644;
91 unsigned int maxSizeBytes = sizeBytes;
92 ifstream shmax(
"/proc/sys/kernel/shmmax");
94 shmax >> maxSizeBytes;
97 B2WARNING(
"RingBuffer: shmget(" << sizeBytes <<
") failed, limiting to system maximum: " << maxSizeBytes);
98 sizeBytes = maxSizeBytes;
99 nwords = maxSizeBytes /
sizeof(int);
102 B2FATAL(
"RingBuffer: shmget(" << sizeBytes <<
103 ") failed. Most likely the system doesn't allow us to reserve the needed shared memory. Try 'echo 500000000 > /proc/sys/kernel/shmmax' as root to set a higher limit (500MB).");
108 B2FATAL(
"RingBuffer: Attaching to shared memory segment via shmat() failed");
115 B2FATAL(
"Aborting execution because we couldn't create a semaphore (see previous error messages for details).");
140 B2DEBUG(32,
"[RingBuffer] check entries = " <<
m_bufinfo->
nbuf);
141 B2DEBUG(32,
"[RingBuffer] check size = " <<
m_bufinfo->
size);
149 m_semshmFileName = std::string(std::filesystem::temp_directory_path() /
"SHM") + to_string(
m_shmid) +
"-SEM" + to_string(
153 B2WARNING(
"RingBuffer ID file could not be created.");
171 B2DEBUG(32,
"RingBuffer: Cleaning up IPC");
173 shmctl(
m_shmid, IPC_RMID, (
struct shmid_ds*)
nullptr);
184 printf(
"bufsize=%d, remain=%d, wptr=%d, rptr=%d, nbuf=%d\n",
192 B2FATAL(
"RingBuffer::insq() failed: invalid buffer size = " << size);
196 B2WARNING(
"Number of attached Tx is 0, so I will not go on with the processing.");
204 throw std::runtime_error(
"[RingBuffer::insq ()] Inserted item (size: " + std::to_string(size) +
205 ") is larger than RingBuffer (size: " + std::to_string(
m_bufinfo->
size + 2) +
")!");
211 memcpy(wptr + 2, buf, size *
sizeof(
int));
226 B2DEBUG(32,
"[RingBuffer] errtype 3");
237 memcpy(wptr + 2, buf, size *
sizeof(
int));
253 memcpy(wptr + 2, buf, size *
sizeof(
int));
255 *(wptr + 1) = size + 2;
288 memcpy(wptr + 2, buf, size *
sizeof(
int));
294 B2DEBUG(32,
"next pointer will exceed rptr.....");
312 throw std::runtime_error(
"[RingBuffer::remq ()] number of entries is negative: " + std::to_string(
m_bufinfo->
nbuf));
324 printf(
"RingBuffer::remq : buffer size = %d, skipped\n", nw);
325 printf(
"RingBuffer::remq : entries = %d\n",
m_bufinfo->
nbuf);
333 memcpy(buf, r_ptr + 2, nw *
sizeof(
int));
357 printf(
"RingBuffer::spyq : buffer size = %d, skipped\n", nw);
358 printf(
"RingBuffer::spyq : entries = %d\n",
m_bufinfo->
nbuf);
363 memcpy(buf, r_ptr + 2, nw *
sizeof(
int));
444 if (semctl(
m_semid, 0, SETVAL, val) == -1) {
445 B2ERROR(
"Initializing semaphore with semctl() failed.");
468 printf(
"***** Ring Buffer Information ***\n");
471 printf(
"[Buffer Info]\n");
void dump_db()
Print some info on the RingBufInfo structure.
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
int m_shmsize
Size of shared memory segment, in bytes.
struct RingBufInfo * m_bufinfo
structure to manage ring buffer.
int tryClear()
Clear the RingBuffer, if the semaphore isn't locked at the moment.
bool m_file
True if m_pathfd needs to be closed.
int remq_counter() const
Return number of remq() calls.
int clear()
Clear the RingBuffer.
void txAttached()
Increase the number of attached Tx counter.
int spyq(int *buf) const
Prefetch a buffer from the RingBuffer w/o removing it.
void cleanup()
Function to detach and remove shared memory.
int m_remq_counter
count remq() calls.
void txDetached()
Decrease the number of attached Tx counter.
key_t m_shmkey
SHM key, see shmget(2).
int shmid() const
Return ID of the shared memory.
void dumpInfo() const
Dump contents of RingBufInfo metadata.
int numq() const
Returns number of entries/buffers in the RingBuffer.
int m_shmid
ID of shared memory segment.
bool allRxWaiting() const
True if and only if buffer is empty and nbusy == 0.
void kill()
Cause termination of reading processes (if they use isDead()).
int remq(int *buf)
Pick up a buffer from the RingBuffer.
int m_pathfd
Associated file descriptor.
int insq_counter() const
Return number of insq() calls.
std::string m_pathname
Path for identifying shared memory if named ring buffer is created.
RingBuffer(int nwords=c_DefaultSize)
Constructor to create a new shared memory in private space.
void openSHM(int nwords)
open shared memory
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
bool m_procIsBusy
Is this process currently processing events from this RingBuffer?
int * m_buftop
Points to memory after the end of m_bufinfo.
int m_insq_counter
count insq() calls.
std::string m_semshmFileName
file path containing ids of shm and sema for private shared mem, used for easier cleanup if we fail t...
bool m_new
True if we created the ring buffer ourselves (and need to clean it).
int ninsq() const
Return number of insq() calls for current buffer.
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
key_t m_semkey
Semaphore key, see semget(2).
int nremq() const
Return number of remq() calls for current buffer.
int * m_shmadr
Address of attached shared memory segment.
Handles creation, locking and unlocking of System V semaphores.
static void destroy(int semId)
Destroy the given semaphore.
static int create(key_t semkey)
Create a new semaphore and initialize it.
static bool isLocked(int semId)
Return true if the given semaphore is locked.
Abstract base class for different kinds of events.
Internal metadata structure for RingBuffer.
int numAttachedTx
number of attached sending processes.
int errtype
Error state? 0: Normal, 1: buffer full and wptr>rptr, others are complicated.
int wptr
Pointer for writing entries.
int size
ring buffer size (integers), minus this header.
int nattached
Number of RingBuffer instances currently attached to this buffer.
int nbusy
Number of attached reading processes currently processing events.
int nremq
Count remq() calls for this buffer.
int nbuf
Number of entries in ring buffer.
int rptr
Pointer for reading entries.
int remain
Unsure, always equal to size.
int prevwptr
Previous state of wptr (for error recovery).
int ninsq
Count insq() calls for this buffer.