9 #include <framework/pcore/RingBuffer.h>
10 #include <framework/pcore/SemaphoreLocker.h>
11 #include <framework/logging/Logger.h>
31 RingBuffer::RingBuffer(
int size)
34 B2DEBUG(32,
"RingBuffer initialization done");
38 RingBuffer::RingBuffer(
const std::string& name,
unsigned int nwords)
41 if (name !=
"private") {
43 m_pathname = string(
"/tmp/") + getenv(
"USER") +
"_RB_" + name;
44 m_pathfd = open(m_pathname.c_str(), O_CREAT | O_EXCL | O_RDWR, 0644);
46 B2DEBUG(32,
"[RingBuffer] Creating a ring buffer with key " << name);
48 }
else if (m_pathfd == -1 && errno == EEXIST) {
49 B2DEBUG(32,
"[RingBuffer] Attaching the ring buffer with key " << name);
52 B2FATAL(
"RingBuffer: error opening shm file: " << m_pathname);
54 m_shmkey = ftok(m_pathname.c_str(), 1);
55 m_semkey = ftok(m_pathname.c_str(), 2);
57 B2DEBUG(32,
"[RingBuffer] Opening private ring buffer");
63 B2DEBUG(32,
"First global RingBuffer creation: writing SHM info to file.");
65 snprintf(rbufinfo,
sizeof(rbufinfo),
"%d\n", m_shmid);
66 int is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
67 if (is < 0) perror(
"write");
68 snprintf(rbufinfo,
sizeof(rbufinfo),
"%d\n", m_semid);
69 is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
70 if (is < 0) perror(
"write");
75 B2DEBUG(32,
"RingBuffer initialization done with shm=" << m_shmid);
78 RingBuffer::~RingBuffer()
83 void RingBuffer::openSHM(
int nwords)
86 unsigned int sizeBytes = nwords *
sizeof(int);
87 const auto mode = IPC_CREAT | 0644;
88 m_shmid = shmget(m_shmkey, sizeBytes, mode);
90 unsigned int maxSizeBytes = sizeBytes;
91 ifstream shmax(
"/proc/sys/kernel/shmmax");
93 shmax >> maxSizeBytes;
96 B2WARNING(
"RingBuffer: shmget(" << sizeBytes <<
") failed, limiting to system maximum: " << maxSizeBytes);
97 sizeBytes = maxSizeBytes;
98 nwords = maxSizeBytes /
sizeof(int);
99 m_shmid = shmget(m_shmkey, sizeBytes, mode);
101 B2FATAL(
"RingBuffer: shmget(" << sizeBytes <<
102 ") failed. Most likely the system doesn't allow us to reserve the needed shared memory. Try 'echo 500000000 > /proc/sys/kernel/shmmax' as root to set a higher limit (500MB).");
105 m_shmadr = (
int*) shmat(m_shmid,
nullptr, 0);
106 if (m_shmadr == (
int*) - 1) {
107 B2FATAL(
"RingBuffer: Attaching to shared memory segment via shmat() failed");
111 m_semid = SemaphoreLocker::create(m_semkey);
114 B2FATAL(
"Aborting execution because we couldn't create a semaphore (see previous error messages for details).");
120 m_bufinfo =
reinterpret_cast<RingBufInfo*
>(m_shmadr);
124 m_bufinfo->remain = m_bufinfo->size;
126 m_bufinfo->prevwptr = 0;
129 m_bufinfo->semid = m_semid;
130 m_bufinfo->nattached = 1;
131 m_bufinfo->nbusy = 0;
132 m_bufinfo->errtype = 0;
133 m_bufinfo->numAttachedTx = -1;
134 m_bufinfo->ninsq = 0;
135 m_bufinfo->nremq = 0;
137 m_bufinfo->nattached++;
139 B2DEBUG(32,
"[RingBuffer] check entries = " << m_bufinfo->nbuf);
140 B2DEBUG(32,
"[RingBuffer] check size = " << m_bufinfo->size);
148 m_semshmFileName =
"/tmp/SHM" + to_string(m_shmid) +
"-SEM" + to_string(m_semid) +
"-UNNAMED";
149 int fd = open(m_semshmFileName.c_str(), O_CREAT | O_TRUNC | O_RDWR, 0644);
151 B2WARNING(
"RingBuffer ID file could not be created.");
157 B2DEBUG(35,
"buftop = " << m_buftop <<
", end = " << (m_buftop + m_bufinfo->size));
160 void RingBuffer::cleanup()
165 m_procIsBusy =
false;
169 B2DEBUG(32,
"RingBuffer: Cleaning up IPC");
171 shmctl(m_shmid, IPC_RMID, (
struct shmid_ds*)
nullptr);
172 SemaphoreLocker::destroy(m_semid);
174 unlink(m_pathname.c_str());
176 unlink(m_semshmFileName.c_str());
180 void RingBuffer::dump_db()
182 printf(
"bufsize=%d, remain=%d, wptr=%d, rptr=%d, nbuf=%d\n",
183 m_bufinfo->size, m_bufinfo->remain,
184 m_bufinfo->wptr, m_bufinfo->rptr, m_bufinfo->nbuf);
187 int RingBuffer::insq(
const int* buf,
int size,
bool checkTx)
190 B2FATAL(
"RingBuffer::insq() failed: invalid buffer size = " << size);
192 if (m_bufinfo->numAttachedTx == 0 and checkTx) {
194 B2WARNING(
"Number of attached Tx is 0, so I will not go on with the processing.");
198 if (m_bufinfo->nbuf == 0) {
201 if (size > m_bufinfo->size + 2) {
202 throw std::runtime_error(
"[RingBuffer::insq ()] Inserted item (size: " + std::to_string(size) +
203 ") is larger than RingBuffer (size: " + std::to_string(m_bufinfo->size + 2) +
")!");
206 m_bufinfo->errtype = 0;
207 int* wptr = m_buftop + m_bufinfo->wptr;
209 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
210 memcpy(wptr + 2, buf, size *
sizeof(
int));
211 m_bufinfo->prevwptr = m_bufinfo->wptr;
212 m_bufinfo->wptr += (size + 2);
217 }
else if (m_bufinfo->wptr > m_bufinfo->rptr) {
218 if (m_bufinfo->errtype != 4 &&
219 m_bufinfo->errtype != 3 && m_bufinfo->errtype != 0) {
220 B2ERROR(
"insq: Error in errtype 0; current=" << m_bufinfo->errtype);
223 if (m_bufinfo->errtype == 3) {
225 B2DEBUG(32,
"[RingBuffer] errtype 3");
227 }
else if (m_bufinfo->errtype == 4) {
231 m_bufinfo->errtype = 0;
232 if (size + 2 < m_bufinfo->size - m_bufinfo->wptr) {
233 int* wptr = m_buftop + m_bufinfo->wptr;
235 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
236 memcpy(wptr + 2, buf, size *
sizeof(
int));
237 m_bufinfo->prevwptr = m_bufinfo->wptr;
238 m_bufinfo->wptr += (size + 2);
244 if (m_bufinfo->rptr >= size + 2) {
247 if (m_bufinfo->errtype != 0) {
248 B2ERROR(
"insq: Error in errtype 1; current=" << m_bufinfo->errtype);
251 m_bufinfo->errtype = 1;
252 int* wptr = m_buftop;
253 memcpy(wptr + 2, buf, size *
sizeof(
int));
255 *(wptr + 1) = size + 2;
256 m_bufinfo->wptr = *(wptr + 1);
257 int* prevptr = m_buftop + m_bufinfo->prevwptr;
259 m_bufinfo->prevwptr = 0;
260 if (m_bufinfo->nbuf == 0) {
261 m_bufinfo->errtype = 4;
277 if (m_bufinfo->wptr + size + 2 < m_bufinfo->rptr &&
278 size + 2 < m_bufinfo->size - m_bufinfo->rptr) {
279 if (m_bufinfo->errtype != 1 && m_bufinfo->errtype != 2 &&
280 m_bufinfo->errtype != 3) {
281 B2ERROR(
"insq: Error in errtype 2; current=" << m_bufinfo->errtype);
284 m_bufinfo->errtype = 2;
285 int* wptr = m_buftop + m_bufinfo->wptr;
287 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
288 memcpy(wptr + 2, buf, size *
sizeof(
int));
289 m_bufinfo->prevwptr = m_bufinfo->wptr;
290 m_bufinfo->wptr += (size + 2);
293 if (m_bufinfo->wptr > m_bufinfo->rptr) {
294 B2DEBUG(32,
"next pointer will exceed rptr.....");
295 m_bufinfo->errtype = 3;
308 int RingBuffer::remq(
int* buf)
311 if (m_bufinfo->nbuf < 0) {
312 throw std::runtime_error(
"[RingBuffer::remq ()] number of entries is negative: " + std::to_string(m_bufinfo->nbuf));
314 if (m_bufinfo->nbuf == 0) {
317 m_procIsBusy =
false;
321 int* r_ptr = m_buftop + m_bufinfo->rptr;
324 printf(
"RingBuffer::remq : buffer size = %d, skipped\n", nw);
325 printf(
"RingBuffer::remq : entries = %d\n", m_bufinfo->nbuf);
328 m_procIsBusy =
false;
333 memcpy(buf, r_ptr + 2, nw *
sizeof(
int));
334 m_bufinfo->rptr = *(r_ptr + 1);
335 if (m_bufinfo->rptr == 0)
336 m_bufinfo->errtype = 4;
341 if (not m_procIsBusy) {
348 int RingBuffer::spyq(
int* buf)
const
351 if (m_bufinfo->nbuf <= 0) {
354 int* r_ptr = m_buftop + m_bufinfo->rptr;
357 printf(
"RingBuffer::spyq : buffer size = %d, skipped\n", nw);
358 printf(
"RingBuffer::spyq : entries = %d\n", m_bufinfo->nbuf);
363 memcpy(buf, r_ptr + 2, nw *
sizeof(
int));
368 int RingBuffer::numq()
const
370 return m_bufinfo->nbuf;
373 void RingBuffer::txAttached()
376 if (m_bufinfo->numAttachedTx == -1)
377 m_bufinfo->numAttachedTx = 0;
379 m_bufinfo->numAttachedTx++;
381 void RingBuffer::txDetached()
384 m_bufinfo->numAttachedTx--;
385 if (m_bufinfo->numAttachedTx < 0) {
386 m_bufinfo->numAttachedTx = 0;
389 void RingBuffer::kill()
391 m_bufinfo->numAttachedTx = 0;
394 bool RingBuffer::isDead()
const
398 return (m_bufinfo->numAttachedTx == 0) and (m_bufinfo->nbuf <= 0);
400 bool RingBuffer::allRxWaiting()
const
403 return (m_bufinfo->nbusy == 0) and (m_bufinfo->nbuf == 0);
406 int RingBuffer::ninsq()
const
408 return m_bufinfo->ninsq;
411 int RingBuffer::nremq()
const
413 return m_bufinfo->nremq;
416 int RingBuffer::insq_counter()
const
418 return m_insq_counter;
421 int RingBuffer::remq_counter()
const
423 return m_remq_counter;
426 int RingBuffer::clear()
430 m_bufinfo->remain = m_bufinfo->size;
432 m_bufinfo->prevwptr = 0;
435 m_bufinfo->ninsq = 0;
436 m_bufinfo->nremq = 0;
441 void RingBuffer::forceClear()
444 if (semctl(m_semid, 0, SETVAL, val) == -1) {
445 B2ERROR(
"Initializing semaphore with semctl() failed.");
450 int RingBuffer::tryClear()
452 if (SemaphoreLocker::isLocked(m_semid))
458 int RingBuffer::shmid()
const
463 void RingBuffer::dumpInfo()
const
468 printf(
"***** Ring Buffer Information ***\n");
469 printf(
"path = %s\n", m_pathname.c_str());
470 printf(
"shmsize = %d\n", m_shmsize);
471 printf(
"[Buffer Info]\n");
472 printf(
"bufsize = %d\n", m_bufinfo->size);
473 printf(
"remain = %d\n", m_bufinfo->remain);
474 printf(
"wptr = %d\n", m_bufinfo->wptr);
475 printf(
"prevwptr = %d\n", m_bufinfo->prevwptr);
476 printf(
"rptr = %d\n", m_bufinfo->rptr);
477 printf(
"nbuf = %d\n", m_bufinfo->nbuf);
478 printf(
"nattached = %d\n", m_bufinfo->nattached);
479 printf(
"nbusy = %d\n", m_bufinfo->nbusy);
480 printf(
"errtype = %d\n", m_bufinfo->errtype);
481 printf(
"numAttachedTx = %d\n", m_bufinfo->numAttachedTx);
482 printf(
"ninsq = %d\n", m_bufinfo->ninsq);
483 printf(
"nremq = %d\n", m_bufinfo->ninsq);
Handles creation, locking and unlocking of System V semaphores.
Abstract base class for different kinds of events.
Internal metadata structure for RingBuffer.
int size
ring buffer size (integers), minus this header.