14 #include <framework/pcore/RingBuffer.h>
15 #include <framework/pcore/SemaphoreLocker.h>
16 #include <framework/logging/Logger.h>
36 RingBuffer::RingBuffer(
int size)
39 B2DEBUG(32,
"RingBuffer initialization done");
43 RingBuffer::RingBuffer(
const std::string& name,
unsigned int nwords)
46 if (name !=
"private") {
48 m_pathname = string(
"/tmp/") + getenv(
"USER") +
"_RB_" + name;
49 m_pathfd = open(m_pathname.c_str(), O_CREAT | O_EXCL | O_RDWR, 0644);
51 B2DEBUG(32,
"[RingBuffer] Creating a ring buffer with key " << name);
53 }
else if (m_pathfd == -1 && errno == EEXIST) {
54 B2DEBUG(32,
"[RingBuffer] Attaching the ring buffer with key " << name);
57 B2FATAL(
"RingBuffer: error opening shm file: " << m_pathname);
59 m_shmkey = ftok(m_pathname.c_str(), 1);
60 m_semkey = ftok(m_pathname.c_str(), 2);
62 B2DEBUG(32,
"[RingBuffer] Opening private ring buffer");
68 B2DEBUG(32,
"First global RingBuffer creation: writing SHM info to file.");
70 snprintf(rbufinfo,
sizeof(rbufinfo),
"%d\n", m_shmid);
71 int is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
72 if (is < 0) perror(
"write");
73 snprintf(rbufinfo,
sizeof(rbufinfo),
"%d\n", m_semid);
74 is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
75 if (is < 0) perror(
"write");
80 B2DEBUG(32,
"RingBuffer initialization done with shm=" << m_shmid);
83 RingBuffer::~RingBuffer()
88 void RingBuffer::openSHM(
int nwords)
91 unsigned int sizeBytes = nwords *
sizeof(int);
92 const auto mode = IPC_CREAT | 0644;
93 m_shmid = shmget(m_shmkey, sizeBytes, mode);
95 unsigned int maxSizeBytes = sizeBytes;
96 ifstream shmax(
"/proc/sys/kernel/shmmax");
98 shmax >> maxSizeBytes;
101 B2WARNING(
"RingBuffer: shmget(" << sizeBytes <<
") failed, limiting to system maximum: " << maxSizeBytes);
102 sizeBytes = maxSizeBytes;
103 nwords = maxSizeBytes /
sizeof(int);
104 m_shmid = shmget(m_shmkey, sizeBytes, mode);
106 B2FATAL(
"RingBuffer: shmget(" << sizeBytes <<
107 ") failed. Most likely the system doesn't allow us to reserve the needed shared memory. Try 'echo 500000000 > /proc/sys/kernel/shmmax' as root to set a higher limit (500MB).");
110 m_shmadr = (
int*) shmat(m_shmid,
nullptr, 0);
111 if (m_shmadr == (
int*) - 1) {
112 B2FATAL(
"RingBuffer: Attaching to shared memory segment via shmat() failed");
116 m_semid = SemaphoreLocker::create(m_semkey);
119 B2FATAL(
"Aborting execution because we couldn't create a semaphore (see previous error messages for details).");
125 m_bufinfo =
reinterpret_cast<RingBufInfo*
>(m_shmadr);
129 m_bufinfo->remain = m_bufinfo->size;
131 m_bufinfo->prevwptr = 0;
134 m_bufinfo->semid = m_semid;
135 m_bufinfo->nattached = 1;
136 m_bufinfo->nbusy = 0;
137 m_bufinfo->errtype = 0;
138 m_bufinfo->numAttachedTx = -1;
139 m_bufinfo->ninsq = 0;
140 m_bufinfo->nremq = 0;
142 m_bufinfo->nattached++;
144 B2DEBUG(32,
"[RingBuffer] check entries = " << m_bufinfo->nbuf);
145 B2DEBUG(32,
"[RingBuffer] check size = " << m_bufinfo->size);
153 m_semshmFileName =
"/tmp/SHM" + to_string(m_shmid) +
"-SEM" + to_string(m_semid) +
"-UNNAMED";
154 int fd = open(m_semshmFileName.c_str(), O_CREAT | O_TRUNC | O_RDWR, 0644);
156 B2WARNING(
"RingBuffer ID file could not be created.");
162 B2DEBUG(35,
"buftop = " << m_buftop <<
", end = " << (m_buftop + m_bufinfo->size));
165 void RingBuffer::cleanup()
170 m_procIsBusy =
false;
174 B2DEBUG(32,
"RingBuffer: Cleaning up IPC");
176 shmctl(m_shmid, IPC_RMID, (
struct shmid_ds*)
nullptr);
177 SemaphoreLocker::destroy(m_semid);
179 unlink(m_pathname.c_str());
181 unlink(m_semshmFileName.c_str());
185 void RingBuffer::dump_db()
187 printf(
"bufsize=%d, remain=%d, wptr=%d, rptr=%d, nbuf=%d\n",
188 m_bufinfo->size, m_bufinfo->remain,
189 m_bufinfo->wptr, m_bufinfo->rptr, m_bufinfo->nbuf);
192 int RingBuffer::insq(
const int* buf,
int size,
bool checkTx)
195 B2FATAL(
"RingBuffer::insq() failed: invalid buffer size = " << size);
197 if (m_bufinfo->numAttachedTx == 0 and checkTx) {
199 B2WARNING(
"Number of attached Tx is 0, so I will not go on with the processing.");
203 if (m_bufinfo->nbuf == 0) {
206 if (size > m_bufinfo->size + 2) {
207 throw std::runtime_error(
"[RingBuffer::insq ()] Inserted item (size: " + std::to_string(size) +
208 ") is larger than RingBuffer (size: " + std::to_string(m_bufinfo->size + 2) +
")!");
211 m_bufinfo->errtype = 0;
212 int* wptr = m_buftop + m_bufinfo->wptr;
214 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
215 memcpy(wptr + 2, buf, size *
sizeof(
int));
216 m_bufinfo->prevwptr = m_bufinfo->wptr;
217 m_bufinfo->wptr += (size + 2);
222 }
else if (m_bufinfo->wptr > m_bufinfo->rptr) {
223 if (m_bufinfo->errtype != 4 &&
224 m_bufinfo->errtype != 3 && m_bufinfo->errtype != 0) {
225 B2ERROR(
"insq: Error in errtype 0; current=" << m_bufinfo->errtype);
228 if (m_bufinfo->errtype == 3) {
230 B2DEBUG(32,
"[RingBuffer] errtype 3");
232 }
else if (m_bufinfo->errtype == 4) {
236 m_bufinfo->errtype = 0;
237 if (size + 2 < m_bufinfo->size - m_bufinfo->wptr) {
238 int* wptr = m_buftop + m_bufinfo->wptr;
240 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
241 memcpy(wptr + 2, buf, size *
sizeof(
int));
242 m_bufinfo->prevwptr = m_bufinfo->wptr;
243 m_bufinfo->wptr += (size + 2);
249 if (m_bufinfo->rptr >= size + 2) {
250 if (m_bufinfo->errtype != 0) {
251 B2ERROR(
"insq: Error in errtype 1; current=" << m_bufinfo->errtype);
254 m_bufinfo->errtype = 1;
255 int* wptr = m_buftop;
256 memcpy(wptr + 2, buf, size *
sizeof(
int));
258 *(wptr + 1) = size + 2;
259 m_bufinfo->wptr = *(wptr + 1);
260 int* prevptr = m_buftop + m_bufinfo->prevwptr;
262 m_bufinfo->prevwptr = 0;
263 if (m_bufinfo->nbuf == 0) {
264 m_bufinfo->errtype = 4;
280 if (m_bufinfo->wptr + size + 2 < m_bufinfo->rptr &&
281 size + 2 < m_bufinfo->size - m_bufinfo->rptr) {
282 if (m_bufinfo->errtype != 1 && m_bufinfo->errtype != 2 &&
283 m_bufinfo->errtype != 3) {
284 B2ERROR(
"insq: Error in errtype 2; current=" << m_bufinfo->errtype);
287 m_bufinfo->errtype = 2;
288 int* wptr = m_buftop + m_bufinfo->wptr;
290 *(wptr + 1) = m_bufinfo->wptr + (size + 2);
291 memcpy(wptr + 2, buf, size *
sizeof(
int));
292 m_bufinfo->prevwptr = m_bufinfo->wptr;
293 m_bufinfo->wptr += (size + 2);
296 if (m_bufinfo->wptr > m_bufinfo->rptr) {
297 B2DEBUG(32,
"next pointer will exceed rptr.....");
298 m_bufinfo->errtype = 3;
311 int RingBuffer::remq(
int* buf)
314 if (m_bufinfo->nbuf < 0) {
315 throw std::runtime_error(
"[RingBuffer::remq ()] number of entries is negative: " + std::to_string(m_bufinfo->nbuf));
317 if (m_bufinfo->nbuf == 0) {
320 m_procIsBusy =
false;
324 int* r_ptr = m_buftop + m_bufinfo->rptr;
327 printf(
"RingBuffer::remq : buffer size = %d, skipped\n", nw);
328 printf(
"RingBuffer::remq : entries = %d\n", m_bufinfo->nbuf);
331 m_procIsBusy =
false;
336 memcpy(buf, r_ptr + 2, nw *
sizeof(
int));
337 m_bufinfo->rptr = *(r_ptr + 1);
338 if (m_bufinfo->rptr == 0)
339 m_bufinfo->errtype = 4;
344 if (not m_procIsBusy) {
351 int RingBuffer::spyq(
int* buf)
const
354 if (m_bufinfo->nbuf <= 0) {
357 int* r_ptr = m_buftop + m_bufinfo->rptr;
360 printf(
"RingBuffer::spyq : buffer size = %d, skipped\n", nw);
361 printf(
"RingBuffer::spyq : entries = %d\n", m_bufinfo->nbuf);
366 memcpy(buf, r_ptr + 2, nw *
sizeof(
int));
371 int RingBuffer::numq()
const
373 return m_bufinfo->nbuf;
376 void RingBuffer::txAttached()
379 if (m_bufinfo->numAttachedTx == -1)
380 m_bufinfo->numAttachedTx = 0;
382 m_bufinfo->numAttachedTx++;
384 void RingBuffer::txDetached()
387 m_bufinfo->numAttachedTx--;
388 if (m_bufinfo->numAttachedTx < 0) {
389 m_bufinfo->numAttachedTx = 0;
392 void RingBuffer::kill()
394 m_bufinfo->numAttachedTx = 0;
397 bool RingBuffer::isDead()
const
401 return (m_bufinfo->numAttachedTx == 0) and (m_bufinfo->nbuf <= 0);
403 bool RingBuffer::allRxWaiting()
const
406 return (m_bufinfo->nbusy == 0) and (m_bufinfo->nbuf == 0);
409 int RingBuffer::ninsq()
const
411 return m_bufinfo->ninsq;
414 int RingBuffer::nremq()
const
416 return m_bufinfo->nremq;
419 int RingBuffer::insq_counter()
const
421 return m_insq_counter;
424 int RingBuffer::remq_counter()
const
426 return m_remq_counter;
429 int RingBuffer::clear()
433 m_bufinfo->remain = m_bufinfo->size;
435 m_bufinfo->prevwptr = 0;
438 m_bufinfo->ninsq = 0;
439 m_bufinfo->nremq = 0;
444 void RingBuffer::forceClear()
447 if (semctl(m_semid, 0, SETVAL, val) == -1) {
448 B2ERROR(
"Initializing semaphore with semctl() failed.");
453 int RingBuffer::tryClear()
455 if (SemaphoreLocker::isLocked(m_semid))
461 int RingBuffer::shmid()
const
466 void RingBuffer::dumpInfo()
const
471 printf(
"***** Ring Buffer Information ***\n");
472 printf(
"path = %s\n", m_pathname.c_str());
473 printf(
"shmsize = %d\n", m_shmsize);
474 printf(
"[Buffer Info]\n");
475 printf(
"bufsize = %d\n", m_bufinfo->size);
476 printf(
"remain = %d\n", m_bufinfo->remain);
477 printf(
"wptr = %d\n", m_bufinfo->wptr);
478 printf(
"prevwptr = %d\n", m_bufinfo->prevwptr);
479 printf(
"rptr = %d\n", m_bufinfo->rptr);
480 printf(
"nbuf = %d\n", m_bufinfo->nbuf);
481 printf(
"nattached = %d\n", m_bufinfo->nattached);
482 printf(
"nbusy = %d\n", m_bufinfo->nbusy);
483 printf(
"errtype = %d\n", m_bufinfo->errtype);
484 printf(
"numAttachedTx = %d\n", m_bufinfo->numAttachedTx);
485 printf(
"ninsq = %d\n", m_bufinfo->ninsq);
486 printf(
"nremq = %d\n", m_bufinfo->ninsq);