Belle II Software  release-05-01-25
RingBuffer.cc
Go to the documentation of this file.
1 
6 //+
7 // File : RingBuffer.cc
8 // Description : Ring Buffer manager on shared memory
9 //
10 // Author : Ryosuke Itoh, IPNS, KEK
11 // Date : 29 - Apr - 2000
12 //-
13 
14 #include <framework/pcore/RingBuffer.h>
15 #include <framework/pcore/SemaphoreLocker.h>
16 #include <framework/logging/Logger.h>
17 
18 #include <sys/ipc.h>
19 #include <sys/shm.h>
20 
21 #include <cerrno>
22 #include <cstdio>
23 #include <unistd.h>
24 #include <cstring>
25 #include <fcntl.h>
26 #include <cstdlib>
27 #include <sys/sem.h>
28 
29 #include <fstream>
30 #include <stdexcept>
31 
32 using namespace std;
33 using namespace Belle2;
34 
35 // Constructor of Private Ringbuffer
36 RingBuffer::RingBuffer(int size)
37 {
38  openSHM(size);
39  B2DEBUG(32, "RingBuffer initialization done");
40 }
41 
42 // Constructor of Global Ringbuffer with name
43 RingBuffer::RingBuffer(const std::string& name, unsigned int nwords)
44 {
45  // 0. Determine shared memory type
46  if (name != "private") { // Global
47  m_file = true;
48  m_pathname = string("/tmp/") + getenv("USER") + "_RB_" + name;
49  m_pathfd = open(m_pathname.c_str(), O_CREAT | O_EXCL | O_RDWR, 0644);
50  if (m_pathfd > 0) { // a new shared memory file created
51  B2DEBUG(32, "[RingBuffer] Creating a ring buffer with key " << name);
52  m_new = true;
53  } else if (m_pathfd == -1 && errno == EEXIST) { // shm already there
54  B2DEBUG(32, "[RingBuffer] Attaching the ring buffer with key " << name);
55  m_new = false;
56  } else {
57  B2FATAL("RingBuffer: error opening shm file: " << m_pathname);
58  }
59  m_shmkey = ftok(m_pathname.c_str(), 1);
60  m_semkey = ftok(m_pathname.c_str(), 2);
61  } else { // Private
62  B2DEBUG(32, "[RingBuffer] Opening private ring buffer");
63  }
64 
65  openSHM(nwords);
66 
67  if (m_pathfd > 0) {
68  B2DEBUG(32, "First global RingBuffer creation: writing SHM info to file.");
69  char rbufinfo[256];
70  snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_shmid);
71  int is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
72  if (is < 0) perror("write");
73  snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_semid);
74  is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
75  if (is < 0) perror("write");
76  close(m_pathfd);
77  }
78 
79 
80  B2DEBUG(32, "RingBuffer initialization done with shm=" << m_shmid);
81 }
82 
83 RingBuffer::~RingBuffer()
84 {
85  cleanup();
86 }
87 
88 void RingBuffer::openSHM(int nwords)
89 {
90  // 1. Open shared memory
91  unsigned int sizeBytes = nwords * sizeof(int);
92  const auto mode = IPC_CREAT | 0644;
93  m_shmid = shmget(m_shmkey, sizeBytes, mode);
94  if (m_shmid < 0) {
95  unsigned int maxSizeBytes = sizeBytes;
96  ifstream shmax("/proc/sys/kernel/shmmax");
97  if (shmax.good())
98  shmax >> maxSizeBytes;
99  shmax.close();
100 
101  B2WARNING("RingBuffer: shmget(" << sizeBytes << ") failed, limiting to system maximum: " << maxSizeBytes);
102  sizeBytes = maxSizeBytes;
103  nwords = maxSizeBytes / sizeof(int);
104  m_shmid = shmget(m_shmkey, sizeBytes, mode);
105  if (m_shmid < 0) {
106  B2FATAL("RingBuffer: shmget(" << sizeBytes <<
107  ") failed. Most likely the system doesn't allow us to reserve the needed shared memory. Try 'echo 500000000 > /proc/sys/kernel/shmmax' as root to set a higher limit (500MB).");
108  }
109  }
110  m_shmadr = (int*) shmat(m_shmid, nullptr, 0);
111  if (m_shmadr == (int*) - 1) {
112  B2FATAL("RingBuffer: Attaching to shared memory segment via shmat() failed");
113  }
114 
115  // 2. Open Semaphore
116  m_semid = SemaphoreLocker::create(m_semkey);
117  if (m_semid < 0) {
118  cleanup();
119  B2FATAL("Aborting execution because we couldn't create a semaphore (see previous error messages for details).");
120  }
121  SemaphoreLocker locker(m_semid); //prevent simultaneous initialization
122 
123  // 3. Initialize control parameters
124  m_shmsize = nwords;
125  m_bufinfo = reinterpret_cast<RingBufInfo*>(m_shmadr);
126  m_buftop = m_shmadr + sizeof(struct RingBufInfo);
127  if (m_new) {
128  m_bufinfo->size = m_shmsize - sizeof(struct RingBufInfo);
129  m_bufinfo->remain = m_bufinfo->size;
130  m_bufinfo->wptr = 0;
131  m_bufinfo->prevwptr = 0;
132  m_bufinfo->rptr = 0;
133  m_bufinfo->nbuf = 0;
134  m_bufinfo->semid = m_semid;
135  m_bufinfo->nattached = 1;
136  m_bufinfo->nbusy = 0;
137  m_bufinfo->errtype = 0;
138  m_bufinfo->numAttachedTx = -1;
139  m_bufinfo->ninsq = 0;
140  m_bufinfo->nremq = 0;
141  } else {
142  m_bufinfo->nattached++;
143 
144  B2DEBUG(32, "[RingBuffer] check entries = " << m_bufinfo->nbuf);
145  B2DEBUG(32, "[RingBuffer] check size = " << m_bufinfo->size);
146  }
147 
148  m_remq_counter = 0;
149  m_insq_counter = 0;
150 
151  if (m_new) {
152  // Leave id of shm and semaphore in file name
153  m_semshmFileName = "/tmp/SHM" + to_string(m_shmid) + "-SEM" + to_string(m_semid) + "-UNNAMED";
154  int fd = open(m_semshmFileName.c_str(), O_CREAT | O_TRUNC | O_RDWR, 0644);
155  if (fd < 0) {
156  B2WARNING("RingBuffer ID file could not be created.");
157  } else {
158  close(fd);
159  }
160  }
161 
162  B2DEBUG(35, "buftop = " << m_buftop << ", end = " << (m_buftop + m_bufinfo->size));
163 }
164 
165 void RingBuffer::cleanup()
166 {
167  if (m_procIsBusy) {
168  SemaphoreLocker locker(m_semid);
169  m_bufinfo->nbusy--;
170  m_procIsBusy = false;
171  }
172 
173  shmdt(m_shmadr);
174  B2DEBUG(32, "RingBuffer: Cleaning up IPC");
175  if (m_new) {
176  shmctl(m_shmid, IPC_RMID, (struct shmid_ds*) nullptr);
177  SemaphoreLocker::destroy(m_semid);
178  if (m_file) {
179  unlink(m_pathname.c_str());
180  }
181  unlink(m_semshmFileName.c_str());
182  }
183 }
184 
185 void RingBuffer::dump_db()
186 {
187  printf("bufsize=%d, remain=%d, wptr=%d, rptr=%d, nbuf=%d\n",
188  m_bufinfo->size, m_bufinfo->remain,
189  m_bufinfo->wptr, m_bufinfo->rptr, m_bufinfo->nbuf);
190 }
191 
192 int RingBuffer::insq(const int* buf, int size, bool checkTx)
193 {
194  if (size <= 0) {
195  B2FATAL("RingBuffer::insq() failed: invalid buffer size = " << size);
196  }
197  if (m_bufinfo->numAttachedTx == 0 and checkTx) {
198  //safe abort was requested
199  B2WARNING("Number of attached Tx is 0, so I will not go on with the processing.");
200  exit(0);
201  }
202  SemaphoreLocker locker(m_semid);
203  if (m_bufinfo->nbuf == 0) {
204  m_bufinfo->wptr = 0;
205  m_bufinfo->rptr = 0;
206  if (size > m_bufinfo->size + 2) {
207  throw std::runtime_error("[RingBuffer::insq ()] Inserted item (size: " + std::to_string(size) +
208  ") is larger than RingBuffer (size: " + std::to_string(m_bufinfo->size + 2) + ")!");
209  return -1;
210  }
211  m_bufinfo->errtype = 0;
212  int* wptr = m_buftop + m_bufinfo->wptr;
213  *wptr = size;
214  *(wptr + 1) = m_bufinfo->wptr + (size + 2);
215  memcpy(wptr + 2, buf, size * sizeof(int));
216  m_bufinfo->prevwptr = m_bufinfo->wptr;
217  m_bufinfo->wptr += (size + 2);
218  m_bufinfo->nbuf++;
219  m_bufinfo->ninsq++;
220  m_insq_counter++;
221  return size;
222  } else if (m_bufinfo->wptr > m_bufinfo->rptr) {
223  if (m_bufinfo->errtype != 4 &&
224  m_bufinfo->errtype != 3 && m_bufinfo->errtype != 0) {
225  B2ERROR("insq: Error in errtype 0; current=" << m_bufinfo->errtype);
226  return -1;
227  }
228  if (m_bufinfo->errtype == 3) {
229  // printf ( "---> errtype is 3, still remaining buffers\n" );
230  B2DEBUG(32, "[RingBuffer] errtype 3");
231  return -1;
232  } else if (m_bufinfo->errtype == 4) {
233  // printf ( "---> errtype returned to 0, wptr=%d, rptr=%d\n",
234  // m_bufinfo->wptr, m_bufinfo->rptr );
235  }
236  m_bufinfo->errtype = 0;
237  if (size + 2 < m_bufinfo->size - m_bufinfo->wptr) { // normal case
238  int* wptr = m_buftop + m_bufinfo->wptr;
239  *wptr = size;
240  *(wptr + 1) = m_bufinfo->wptr + (size + 2);
241  memcpy(wptr + 2, buf, size * sizeof(int));
242  m_bufinfo->prevwptr = m_bufinfo->wptr;
243  m_bufinfo->wptr += (size + 2);
244  m_bufinfo->nbuf++;
245  m_bufinfo->ninsq++;
246  m_insq_counter++;
247  return size;
248  } else {
249  if (m_bufinfo->rptr >= size + 2) { // buffer full and wptr>rptr
250  if (m_bufinfo->errtype != 0) {
251  B2ERROR("insq: Error in errtype 1; current=" << m_bufinfo->errtype);
252  return -1;
253  }
254  m_bufinfo->errtype = 1;
255  int* wptr = m_buftop;
256  memcpy(wptr + 2, buf, size * sizeof(int));
257  *wptr = size;
258  *(wptr + 1) = size + 2;
259  m_bufinfo->wptr = *(wptr + 1);
260  int* prevptr = m_buftop + m_bufinfo->prevwptr;
261  *(prevptr + 1) = 0;
262  m_bufinfo->prevwptr = 0;
263  if (m_bufinfo->nbuf == 0) {
264  m_bufinfo->errtype = 4;
265  m_bufinfo->rptr = 0;
266  }
267  m_bufinfo->nbuf++;
268  // printf ( "insq: no more space, space below rptr; prev=%d, new=%d\n",
269  // m_bufinfo->prevwptr, m_bufinfo->wptr );
270  m_bufinfo->ninsq++;
271  m_insq_counter++;
272  return size;
273  } else {
274  // printf ( "insq: wptr>rptr, no more space, no space below rptr(%d), readbuf=%d\n",
275  // m_bufinfo->rptr, m_bufinfo->readbuf );
276  return -1;
277  }
278  }
279  } else { // wptr < rptr
280  if (m_bufinfo->wptr + size + 2 < m_bufinfo->rptr &&
281  size + 2 < m_bufinfo->size - m_bufinfo->rptr) {
282  if (m_bufinfo->errtype != 1 && m_bufinfo->errtype != 2 &&
283  m_bufinfo->errtype != 3) {
284  B2ERROR("insq: Error in errtype 2; current=" << m_bufinfo->errtype);
285  return -1;
286  }
287  m_bufinfo->errtype = 2;
288  int* wptr = m_buftop + m_bufinfo->wptr;
289  *wptr = size;
290  *(wptr + 1) = m_bufinfo->wptr + (size + 2);
291  memcpy(wptr + 2, buf, size * sizeof(int));
292  m_bufinfo->prevwptr = m_bufinfo->wptr;
293  m_bufinfo->wptr += (size + 2);
294  m_bufinfo->nbuf++;
295  // printf ( "insq: wptr<rptr and enough space below rptr; curr=%d, next=%d, rptr=%d\n", m_bufinfo->prevwptr, m_bufinfo->wptr, m_bufinfo->rptr );
296  if (m_bufinfo->wptr > m_bufinfo->rptr) {
297  B2DEBUG(32, "next pointer will exceed rptr.....");
298  m_bufinfo->errtype = 3;
299  }
300  m_bufinfo->ninsq++;
301  m_insq_counter++;
302  return size;
303  } else {
304  // printf ( "insq: wptr<rptr; no more space below rptr(%d), wptr(%d)\n",
305  // m_bufinfo->rptr, m_bufinfo->wptr );
306  return -1;
307  }
308  }
309 }
310 
311 int RingBuffer::remq(int* buf)
312 {
313  SemaphoreLocker locker(m_semid);
314  if (m_bufinfo->nbuf < 0) {
315  throw std::runtime_error("[RingBuffer::remq ()] number of entries is negative: " + std::to_string(m_bufinfo->nbuf));
316  }
317  if (m_bufinfo->nbuf == 0) {
318  if (m_procIsBusy) {
319  m_bufinfo->nbusy--;
320  m_procIsBusy = false;
321  }
322  return 0;
323  }
324  int* r_ptr = m_buftop + m_bufinfo->rptr;
325  int nw = *r_ptr;
326  if (nw <= 0) {
327  printf("RingBuffer::remq : buffer size = %d, skipped\n", nw);
328  printf("RingBuffer::remq : entries = %d\n", m_bufinfo->nbuf);
329  if (m_procIsBusy) {
330  m_bufinfo->nbusy--;
331  m_procIsBusy = false;
332  }
333  return 0;
334  }
335  if (buf)
336  memcpy(buf, r_ptr + 2, nw * sizeof(int));
337  m_bufinfo->rptr = *(r_ptr + 1);
338  if (m_bufinfo->rptr == 0)
339  m_bufinfo->errtype = 4;
340  m_bufinfo->nbuf--;
341  m_bufinfo->nremq++;
342  m_remq_counter++;
343 
344  if (not m_procIsBusy) {
345  m_bufinfo->nbusy++;
346  m_procIsBusy = true;
347  }
348  return nw;
349 }
350 
351 int RingBuffer::spyq(int* buf) const
352 {
353  SemaphoreLocker locker(m_semid);
354  if (m_bufinfo->nbuf <= 0) {
355  return 0;
356  }
357  int* r_ptr = m_buftop + m_bufinfo->rptr;
358  int nw = *r_ptr;
359  if (nw <= 0) {
360  printf("RingBuffer::spyq : buffer size = %d, skipped\n", nw);
361  printf("RingBuffer::spyq : entries = %d\n", m_bufinfo->nbuf);
362  return 0;
363  }
364  // printf ( "remq : taking buf from %d(%d)\n", m_bufinfo->rptr, nw );
365  // Copy buffer without modifying management parameters.
366  memcpy(buf, r_ptr + 2, nw * sizeof(int));
367  // Exit
368  return nw;
369 }
370 
371 int RingBuffer::numq() const
372 {
373  return m_bufinfo->nbuf;
374 }
375 
376 void RingBuffer::txAttached()
377 {
378  SemaphoreLocker locker(m_semid);
379  if (m_bufinfo->numAttachedTx == -1) //first attach
380  m_bufinfo->numAttachedTx = 0;
381 
382  m_bufinfo->numAttachedTx++;
383 }
384 void RingBuffer::txDetached()
385 {
386  SemaphoreLocker locker(m_semid);
387  m_bufinfo->numAttachedTx--;
388  if (m_bufinfo->numAttachedTx < 0) {
389  m_bufinfo->numAttachedTx = 0;
390  }
391 }
392 void RingBuffer::kill()
393 {
394  m_bufinfo->numAttachedTx = 0;
395  m_bufinfo->nbuf = 0;
396 }
397 bool RingBuffer::isDead() const
398 {
399  SemaphoreLocker locker(m_semid);
400  //NOTE: numAttachedTx == -1 also means we should read data (i.e. initialization pending)
401  return (m_bufinfo->numAttachedTx == 0) and (m_bufinfo->nbuf <= 0);
402 }
403 bool RingBuffer::allRxWaiting() const
404 {
405  SemaphoreLocker locker(m_semid);
406  return (m_bufinfo->nbusy == 0) and (m_bufinfo->nbuf == 0);
407 }
408 
409 int RingBuffer::ninsq() const
410 {
411  return m_bufinfo->ninsq;
412 }
413 
414 int RingBuffer::nremq() const
415 {
416  return m_bufinfo->nremq;
417 }
418 
419 int RingBuffer::insq_counter() const
420 {
421  return m_insq_counter;
422 }
423 
424 int RingBuffer::remq_counter() const
425 {
426  return m_remq_counter;
427 }
428 
429 int RingBuffer::clear()
430 {
431  SemaphoreLocker locker(m_semid);
432  // m_bufinfo->size = m_shmsize - sizeof ( struct RingBufInfo );
433  m_bufinfo->remain = m_bufinfo->size;
434  m_bufinfo->wptr = 0;
435  m_bufinfo->prevwptr = 0;
436  m_bufinfo->rptr = 0;
437  m_bufinfo->nbuf = 0;
438  m_bufinfo->ninsq = 0;
439  m_bufinfo->nremq = 0;
440 
441  return 0;
442 }
443 
444 void RingBuffer::forceClear()
445 {
446  int val = 1;
447  if (semctl(m_semid, 0, SETVAL, val) == -1) { //set 0th semaphore to semval
448  B2ERROR("Initializing semaphore with semctl() failed.");
449  }
450  clear();
451 }
452 
453 int RingBuffer::tryClear()
454 {
455  if (SemaphoreLocker::isLocked(m_semid))
456  return 0;
457 
458  return clear();
459 }
460 
461 int RingBuffer::shmid() const
462 {
463  return m_shmid;
464 }
465 
466 void RingBuffer::dumpInfo() const
467 {
468  SemaphoreLocker locker(m_semid);
469 
470  // Dump control parameters
471  printf("***** Ring Buffer Information ***\n");
472  printf("path = %s\n", m_pathname.c_str());
473  printf("shmsize = %d\n", m_shmsize);
474  printf("[Buffer Info]\n");
475  printf("bufsize = %d\n", m_bufinfo->size);
476  printf("remain = %d\n", m_bufinfo->remain);
477  printf("wptr = %d\n", m_bufinfo->wptr);
478  printf("prevwptr = %d\n", m_bufinfo->prevwptr);
479  printf("rptr = %d\n", m_bufinfo->rptr);
480  printf("nbuf = %d\n", m_bufinfo->nbuf);
481  printf("nattached = %d\n", m_bufinfo->nattached);
482  printf("nbusy = %d\n", m_bufinfo->nbusy);
483  printf("errtype = %d\n", m_bufinfo->errtype);
484  printf("numAttachedTx = %d\n", m_bufinfo->numAttachedTx);
485  printf("ninsq = %d\n", m_bufinfo->ninsq);
486  printf("nremq = %d\n", m_bufinfo->ninsq);
487 }
488 
Belle2::RingBufInfo::size
int size
ring buffer size (integers), minus this header.
Definition: RingBuffer.h:19
Belle2::RingBufInfo
Internal metadata structure for RingBuffer.
Definition: RingBuffer.h:18
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::SemaphoreLocker
Handles creation, locking and unlocking of System V semaphores.
Definition: SemaphoreLocker.h:17