9 #include <framework/pcore/RingBuffer.h>
10 #include <framework/pcore/SemaphoreLocker.h>
11 #include <framework/logging/Logger.h>
32 RingBuffer::RingBuffer(
int size)
35 B2DEBUG(32,
"RingBuffer initialization done");
39 RingBuffer::RingBuffer(
const std::string& name,
unsigned int nwords)
42 if (name !=
"private") {
44 m_pathname = std::string(std::filesystem::temp_directory_path() / getenv(
"USER")) +
"_RB_" + name;
45 m_pathfd = open(m_pathname.c_str(), O_CREAT | O_EXCL | O_RDWR, 0644);
47 B2DEBUG(32,
"[RingBuffer] Creating a ring buffer with key " << name);
49 }
else if (m_pathfd == -1 && errno == EEXIST) {
50 B2DEBUG(32,
"[RingBuffer] Attaching the ring buffer with key " << name);
53 B2FATAL(
"RingBuffer: error opening shm file: " << m_pathname);
55 m_shmkey = ftok(m_pathname.c_str(), 1);
56 m_semkey = ftok(m_pathname.c_str(), 2);
58 B2DEBUG(32,
"[RingBuffer] Opening private ring buffer");
64 B2DEBUG(32,
"First global RingBuffer creation: writing SHM info to file.");
66 snprintf(rbufinfo,
sizeof(rbufinfo),
"%d\n", m_shmid);
67 int is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
68 if (is < 0) perror(
"write");
69 snprintf(rbufinfo,
sizeof(rbufinfo),
"%d\n", m_semid);
70 is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
71 if (is < 0) perror(
"write");
76 B2DEBUG(32,
"RingBuffer initialization done with shm=" << m_shmid);
79 RingBuffer::~RingBuffer()
84 void RingBuffer::openSHM(
int nwords)
87 unsigned int sizeBytes = nwords *
sizeof(int);
88 const auto mode = IPC_CREAT | 0644;
89 m_shmid = shmget(m_shmkey, sizeBytes, mode);
91 unsigned int maxSizeBytes = sizeBytes;
92 ifstream shmax(
"/proc/sys/kernel/shmmax");
94 shmax >> maxSizeBytes;
97 B2WARNING(
"RingBuffer: shmget(" << sizeBytes <<
") failed, limiting to system maximum: " << maxSizeBytes);
98 sizeBytes = maxSizeBytes;
99 nwords = maxSizeBytes /
sizeof(int);
100 m_shmid = shmget(m_shmkey, sizeBytes, mode);
102 B2FATAL(
"RingBuffer: shmget(" << sizeBytes <<
103 ") failed. Most likely the system doesn't allow us to reserve the needed shared memory. Try 'echo 500000000 > /proc/sys/kernel/shmmax' as root to set a higher limit (500MB).");
106 m_shmadr = (
int*) shmat(m_shmid,
nullptr, 0);
107 if (m_shmadr == (
int*) - 1) {
108 B2FATAL(
"RingBuffer: Attaching to shared memory segment via shmat() failed");
112 m_semid = SemaphoreLocker::create(m_semkey);
115 B2FATAL(
"Aborting execution because we couldn't create a semaphore (see previous error messages for details).");
121 m_bufinfo =
reinterpret_cast<RingBufInfo*
>(m_shmadr);
125 m_bufinfo->remain = m_bufinfo->size;
127 m_bufinfo->prevwptr = 0;
130 m_bufinfo->semid = m_semid;
131 m_bufinfo->nattached = 1;
132 m_bufinfo->nbusy = 0;
133 m_bufinfo->errtype = 0;
134 m_bufinfo->numAttachedTx = -1;
135 m_bufinfo->ninsq = 0;
136 m_bufinfo->nremq = 0;
138 m_bufinfo->nattached++;
140 B2DEBUG(32,
"[RingBuffer] check entries = " << m_bufinfo->nbuf);
141 B2DEBUG(32,
"[RingBuffer] check size = " << m_bufinfo->size);
149 m_semshmFileName = std::string(std::filesystem::temp_directory_path() /
"SHM") + to_string(m_shmid) +
"-SEM" + to_string(
150 m_semid) +
"-UNNAMED";
151 int fd = open(m_semshmFileName.c_str(), O_CREAT | O_TRUNC | O_RDWR, 0644);
153 B2WARNING(
"RingBuffer ID file could not be created.");
159 B2DEBUG(35,
"buftop = " << m_buftop <<
", end = " << (m_buftop + m_bufinfo->size));
162 void RingBuffer::cleanup()
167 m_procIsBusy =
false;
171 B2DEBUG(32,
"RingBuffer: Cleaning up IPC");
173 shmctl(m_shmid, IPC_RMID, (
struct shmid_ds*)
nullptr);
174 SemaphoreLocker::destroy(m_semid);
176 unlink(m_pathname.c_str());
178 unlink(m_semshmFileName.c_str());
182 void RingBuffer::dump_db()
184 printf(
"bufsize=%d, remain=%d, wptr=%d, rptr=%d, nbuf=%d\n",
185 m_bufinfo->size, m_bufinfo->remain,
186 m_bufinfo->wptr, m_bufinfo->rptr, m_bufinfo->nbuf);
189 int RingBuffer::insq(
const int* buf,
int size,
bool checkTx)
192 B2FATAL(
"RingBuffer::insq() failed: invalid buffer size = " << size);
194 if (m_bufinfo->numAttachedTx == 0 and checkTx) {
196 B2WARNING(
"Number of attached Tx is 0, so I will not go on with the processing.");
200 if (m_bufinfo->nbuf == 0) {
203 if (size > m_bufinfo->size + 2) {
204 throw std::runtime_error(
"[RingBuffer::insq ()] Inserted item (size: " + std::to_string(size) +
205 ") is larger than RingBuffer (size: " + std::to_string(m_bufinfo->size + 2) +
")!");
207 m_bufinfo->errtype = 0;
208 int* wptr = m_buftop + m_bufinfo->wptr;
210 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
211 memcpy(wptr + 2, buf, size *
sizeof(
int));
212 m_bufinfo->prevwptr = m_bufinfo->wptr;
213 m_bufinfo->wptr += (size + 2);
218 }
else if (m_bufinfo->wptr > m_bufinfo->rptr) {
219 if (m_bufinfo->errtype != 4 &&
220 m_bufinfo->errtype != 3 && m_bufinfo->errtype != 0) {
221 B2ERROR(
"insq: Error in errtype 0; current=" << m_bufinfo->errtype);
224 if (m_bufinfo->errtype == 3) {
226 B2DEBUG(32,
"[RingBuffer] errtype 3");
228 }
else if (m_bufinfo->errtype == 4) {
232 m_bufinfo->errtype = 0;
233 if (size + 2 < m_bufinfo->size - m_bufinfo->wptr) {
234 int* wptr = m_buftop + m_bufinfo->wptr;
236 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
237 memcpy(wptr + 2, buf, size *
sizeof(
int));
238 m_bufinfo->prevwptr = m_bufinfo->wptr;
239 m_bufinfo->wptr += (size + 2);
245 if (m_bufinfo->rptr >= size + 2) {
247 if (m_bufinfo->errtype != 0) {
248 B2ERROR(
"insq: Error in errtype 1; current=" << m_bufinfo->errtype);
251 m_bufinfo->errtype = 1;
252 int* wptr = m_buftop;
253 memcpy(wptr + 2, buf, size *
sizeof(
int));
255 *(wptr + 1) = size + 2;
256 m_bufinfo->wptr = *(wptr + 1);
257 int* prevptr = m_buftop + m_bufinfo->prevwptr;
259 m_bufinfo->prevwptr = 0;
260 if (m_bufinfo->nbuf == 0) {
261 m_bufinfo->errtype = 4;
277 if (m_bufinfo->wptr + size + 2 < m_bufinfo->rptr &&
278 size + 2 < m_bufinfo->size - m_bufinfo->rptr) {
279 if (m_bufinfo->errtype != 1 && m_bufinfo->errtype != 2 &&
280 m_bufinfo->errtype != 3) {
281 B2ERROR(
"insq: Error in errtype 2; current=" << m_bufinfo->errtype);
284 m_bufinfo->errtype = 2;
285 int* wptr = m_buftop + m_bufinfo->wptr;
287 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
288 memcpy(wptr + 2, buf, size *
sizeof(
int));
289 m_bufinfo->prevwptr = m_bufinfo->wptr;
290 m_bufinfo->wptr += (size + 2);
293 if (m_bufinfo->wptr > m_bufinfo->rptr) {
294 B2DEBUG(32,
"next pointer will exceed rptr.....");
295 m_bufinfo->errtype = 3;
308 int RingBuffer::remq(
int* buf)
311 if (m_bufinfo->nbuf < 0) {
312 throw std::runtime_error(
"[RingBuffer::remq ()] number of entries is negative: " + std::to_string(m_bufinfo->nbuf));
314 if (m_bufinfo->nbuf == 0) {
317 m_procIsBusy =
false;
321 int* r_ptr = m_buftop + m_bufinfo->rptr;
324 printf(
"RingBuffer::remq : buffer size = %d, skipped\n", nw);
325 printf(
"RingBuffer::remq : entries = %d\n", m_bufinfo->nbuf);
328 m_procIsBusy =
false;
333 memcpy(buf, r_ptr + 2, nw *
sizeof(
int));
334 m_bufinfo->rptr = *(r_ptr + 1);
335 if (m_bufinfo->rptr == 0)
336 m_bufinfo->errtype = 4;
341 if (not m_procIsBusy) {
348 int RingBuffer::spyq(
int* buf)
const
351 if (m_bufinfo->nbuf <= 0) {
354 int* r_ptr = m_buftop + m_bufinfo->rptr;
357 printf(
"RingBuffer::spyq : buffer size = %d, skipped\n", nw);
358 printf(
"RingBuffer::spyq : entries = %d\n", m_bufinfo->nbuf);
363 memcpy(buf, r_ptr + 2, nw *
sizeof(
int));
368 int RingBuffer::numq()
const
370 return m_bufinfo->nbuf;
373 void RingBuffer::txAttached()
376 if (m_bufinfo->numAttachedTx == -1)
377 m_bufinfo->numAttachedTx = 0;
379 m_bufinfo->numAttachedTx++;
381 void RingBuffer::txDetached()
384 m_bufinfo->numAttachedTx--;
385 if (m_bufinfo->numAttachedTx < 0) {
386 m_bufinfo->numAttachedTx = 0;
389 void RingBuffer::kill()
391 m_bufinfo->numAttachedTx = 0;
394 bool RingBuffer::isDead()
const
398 return (m_bufinfo->numAttachedTx == 0) and (m_bufinfo->nbuf <= 0);
400 bool RingBuffer::allRxWaiting()
const
403 return (m_bufinfo->nbusy == 0) and (m_bufinfo->nbuf == 0);
406 int RingBuffer::ninsq()
const
408 return m_bufinfo->ninsq;
411 int RingBuffer::nremq()
const
413 return m_bufinfo->nremq;
416 int RingBuffer::insq_counter()
const
418 return m_insq_counter;
421 int RingBuffer::remq_counter()
const
423 return m_remq_counter;
426 int RingBuffer::clear()
430 m_bufinfo->remain = m_bufinfo->size;
432 m_bufinfo->prevwptr = 0;
435 m_bufinfo->ninsq = 0;
436 m_bufinfo->nremq = 0;
441 void RingBuffer::forceClear()
444 if (semctl(m_semid, 0, SETVAL, val) == -1) {
445 B2ERROR(
"Initializing semaphore with semctl() failed.");
450 int RingBuffer::tryClear()
452 if (SemaphoreLocker::isLocked(m_semid))
458 int RingBuffer::shmid()
const
463 void RingBuffer::dumpInfo()
const
468 printf(
"***** Ring Buffer Information ***\n");
469 printf(
"path = %s\n", m_pathname.c_str());
470 printf(
"shmsize = %d\n", m_shmsize);
471 printf(
"[Buffer Info]\n");
472 printf(
"bufsize = %d\n", m_bufinfo->size);
473 printf(
"remain = %d\n", m_bufinfo->remain);
474 printf(
"wptr = %d\n", m_bufinfo->wptr);
475 printf(
"prevwptr = %d\n", m_bufinfo->prevwptr);
476 printf(
"rptr = %d\n", m_bufinfo->rptr);
477 printf(
"nbuf = %d\n", m_bufinfo->nbuf);
478 printf(
"nattached = %d\n", m_bufinfo->nattached);
479 printf(
"nbusy = %d\n", m_bufinfo->nbusy);
480 printf(
"errtype = %d\n", m_bufinfo->errtype);
481 printf(
"numAttachedTx = %d\n", m_bufinfo->numAttachedTx);
482 printf(
"ninsq = %d\n", m_bufinfo->ninsq);
483 printf(
"nremq = %d\n", m_bufinfo->ninsq);
Handles creation, locking and unlocking of System V semaphores.
Abstract base class for different kinds of events.
Internal metadata structure for RingBuffer.
int size
ring buffer size (integers), minus this header.