9#include <framework/pcore/RingBuffer.h> 
   10#include <framework/pcore/SemaphoreLocker.h> 
   11#include <framework/logging/Logger.h> 
   35  B2DEBUG(32, 
"RingBuffer initialization done");
 
 
   42  if (name != 
"private") { 
 
   44    m_pathname = std::string(std::filesystem::temp_directory_path() / getenv(
"USER")) + 
"_RB_" + name;
 
   47      B2DEBUG(32, 
"[RingBuffer] Creating a ring buffer with key " << name);
 
   49    } 
else if (
m_pathfd == -1 && errno == EEXIST) { 
 
   50      B2DEBUG(32, 
"[RingBuffer] Attaching the ring buffer with key " << name);
 
   53      B2FATAL(
"RingBuffer: error opening shm file: " << 
m_pathname);
 
   58    B2DEBUG(32, 
"[RingBuffer] Opening private ring buffer");
 
   64    B2DEBUG(32, 
"First global RingBuffer creation: writing SHM info to file.");
 
   66    snprintf(rbufinfo, 
sizeof(rbufinfo), 
"%d\n", 
m_shmid);
 
   67    int is = write(
m_pathfd, rbufinfo, strlen(rbufinfo));
 
   68    if (is < 0) perror(
"write");
 
   69    snprintf(rbufinfo, 
sizeof(rbufinfo), 
"%d\n", 
m_semid);
 
   70    is = write(
m_pathfd, rbufinfo, strlen(rbufinfo));
 
   71    if (is < 0) perror(
"write");
 
   76  B2DEBUG(32, 
"RingBuffer initialization done with shm=" << 
m_shmid);
 
 
   87  unsigned int sizeBytes = nwords * 
sizeof(int);
 
   88  const auto mode = IPC_CREAT | 0644;
 
   91    unsigned int maxSizeBytes = sizeBytes;
 
   92    ifstream shmax(
"/proc/sys/kernel/shmmax");
 
   94      shmax >> maxSizeBytes;
 
   97    B2WARNING(
"RingBuffer: shmget(" << sizeBytes << 
") failed, limiting to system maximum: " << maxSizeBytes);
 
   98    sizeBytes = maxSizeBytes;
 
   99    nwords = maxSizeBytes / 
sizeof(int);
 
  102      B2FATAL(
"RingBuffer: shmget(" << sizeBytes <<
 
  103              ") failed. Most likely the system doesn't allow us to reserve the needed shared memory. Try 'echo 500000000 > /proc/sys/kernel/shmmax' as root to set a higher limit (500MB).");
 
  108    B2FATAL(
"RingBuffer: Attaching to shared memory segment via shmat() failed");
 
  115    B2FATAL(
"Aborting execution because we couldn't create a semaphore (see previous error messages for details).");
 
  140    B2DEBUG(32, 
"[RingBuffer] check entries = " << 
m_bufinfo->nbuf);
 
  141    B2DEBUG(32, 
"[RingBuffer] check size = " << 
m_bufinfo->size);
 
  149    m_semshmFileName = std::string(std::filesystem::temp_directory_path() / 
"SHM") + to_string(
m_shmid) + 
"-SEM" + to_string(
 
  153      B2WARNING(
"RingBuffer ID file could not be created.");
 
 
  171  B2DEBUG(32, 
"RingBuffer: Cleaning up IPC");
 
  173    shmctl(
m_shmid, IPC_RMID, (
struct shmid_ds*) 
nullptr);
 
 
  184  printf(
"bufsize=%d, remain=%d, wptr=%d, rptr=%d, nbuf=%d\n",
 
 
  192    B2FATAL(
"RingBuffer::insq() failed: invalid buffer size = " << size);
 
  194  if (
m_bufinfo->numAttachedTx == 0 and checkTx) {
 
  196    B2WARNING(
"Number of attached Tx is 0, so I will not go on with the processing.");
 
  204      throw std::runtime_error(
"[RingBuffer::insq ()] Inserted item (size: " + std::to_string(size) +
 
  205                               ") is larger than RingBuffer (size: " + std::to_string(
m_bufinfo->size + 2) + 
")!");
 
  210    *(wptr + 1) = 
m_bufinfo->wptr + (size + 2);
 
  211    memcpy(wptr + 2, buf, size * 
sizeof(
int));
 
  221      B2ERROR(
"insq: Error in errtype 0; current=" << 
m_bufinfo->errtype);
 
  226      B2DEBUG(32, 
"[RingBuffer] errtype 3");
 
  236      *(wptr + 1) = 
m_bufinfo->wptr + (size + 2);
 
  237      memcpy(wptr + 2, buf, size * 
sizeof(
int));
 
  248          B2ERROR(
"insq: Error in errtype 1; current=" << 
m_bufinfo->errtype);
 
  253        memcpy(wptr + 2, buf, size * 
sizeof(
int));
 
  255        *(wptr + 1) = size + 2;
 
  281        B2ERROR(
"insq: Error in errtype 2; current=" << 
m_bufinfo->errtype);
 
  287      *(wptr + 1) = 
m_bufinfo->wptr + (size + 2);
 
  288      memcpy(wptr + 2, buf, size * 
sizeof(
int));
 
  294        B2DEBUG(32, 
"next pointer will exceed rptr.....");
 
 
  312    throw std::runtime_error(
"[RingBuffer::remq ()] number of entries is negative: " + std::to_string(
m_bufinfo->nbuf));
 
  324    printf(
"RingBuffer::remq : buffer size = %d, skipped\n", nw);
 
  325    printf(
"RingBuffer::remq : entries = %d\n", 
m_bufinfo->nbuf);
 
  333    memcpy(buf, r_ptr + 2, nw * 
sizeof(
int));
 
 
  357    printf(
"RingBuffer::spyq : buffer size = %d, skipped\n", nw);
 
  358    printf(
"RingBuffer::spyq : entries = %d\n", 
m_bufinfo->nbuf);
 
  363  memcpy(buf, r_ptr + 2, nw * 
sizeof(
int));
 
 
  444  if (semctl(
m_semid, 0, SETVAL, val) == -1) { 
 
  445    B2ERROR(
"Initializing semaphore with semctl() failed.");
 
 
  468  printf(
"***** Ring Buffer Information ***\n");
 
  471  printf(
"[Buffer Info]\n");
 
  472  printf(
"bufsize = %d\n", 
m_bufinfo->size);
 
  473  printf(
"remain = %d\n", 
m_bufinfo->remain);
 
  475  printf(
"prevwptr = %d\n", 
m_bufinfo->prevwptr);
 
  478  printf(
"nattached = %d\n", 
m_bufinfo->nattached);
 
  479  printf(
"nbusy = %d\n", 
m_bufinfo->nbusy);
 
  480  printf(
"errtype = %d\n", 
m_bufinfo->errtype);
 
  481  printf(
"numAttachedTx = %d\n", 
m_bufinfo->numAttachedTx);
 
  482  printf(
"ninsq = %d\n", 
m_bufinfo->ninsq);
 
  483  printf(
"nremq = %d\n", 
m_bufinfo->ninsq);
 
 
void dump_db()
Print some info on the RingBufInfo structure.
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
int m_shmsize
Size of shared memory segment, in bytes.
struct RingBufInfo * m_bufinfo
structure to manage ring buffer.
int tryClear()
Clear the RingBuffer, if the semaphore isn't locked at the moment.
bool m_file
True if m_pathfd needs to be closed.
int remq_counter() const
Return number of remq() calls.
int clear()
Clear the RingBuffer.
void txAttached()
Increase the number of attached Tx counter.
int spyq(int *buf) const
Prefetch a buffer from the RingBuffer w/o removing it.
void cleanup()
Function to detach and remove shared memory.
int m_remq_counter
count remq() calls.
void txDetached()
Decrease the number of attached Tx counter.
key_t m_shmkey
SHM key, see shmget(2).
int shmid() const
Return ID of the shared memory.
void dumpInfo() const
Dump contents of RingBufInfo metadata.
int numq() const
Returns number of entries/buffers in the RingBuffer.
int m_shmid
ID of shared memory segment.
bool allRxWaiting() const
True if and only if buffer is empty and nbusy == 0.
void kill()
Cause termination of reading processes (if they use isDead()).
int remq(int *buf)
Pick up a buffer from the RingBuffer.
int m_pathfd
Associated file descriptor.
int insq_counter() const
Return number of insq() calls.
std::string m_pathname
Path for identifying shared memory if named ring buffer is created.
RingBuffer(int nwords=c_DefaultSize)
Constructor to create a new shared memory in private space.
void openSHM(int nwords)
open shared memory
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
bool m_procIsBusy
Is this process currently processing events from this RingBuffer?
int * m_buftop
Points to memory after the end of m_bufinfo.
int m_insq_counter
count insq() calls.
std::string m_semshmFileName
file path containing ids of shm and sema for private shared mem, used for easier cleanup if we fail t...
bool m_new
True if we created the ring buffer ourselves (and need to clean it).
int ninsq() const
Return number of insq() calls for current buffer.
void forceClear()
Forcefully clear the RingBuffer with resetting semaphore.
key_t m_semkey
Semaphore key, see semget(2).
int nremq() const
Return number of remq() calls for current buffer.
int * m_shmadr
Address of attached shared memory segment.
Handles creation, locking and unlocking of System V semaphores.
static void destroy(int semId)
Destroy the given semaphore.
static int create(key_t semkey)
Create a new semaphore and initialize it.
static bool isLocked(int semId)
Return true if the given semaphore is locked.
Abstract base class for different kinds of events.
Internal metadata structure for RingBuffer.