Belle II Software  release-08-01-10
RingBuffer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/RingBuffer.h>
10 #include <framework/pcore/SemaphoreLocker.h>
11 #include <framework/logging/Logger.h>
12 
13 #include <sys/ipc.h>
14 #include <sys/shm.h>
15 
16 #include <cerrno>
17 #include <cstdio>
18 #include <unistd.h>
19 #include <cstring>
20 #include <fcntl.h>
21 #include <cstdlib>
22 #include <sys/sem.h>
23 
24 #include <filesystem>
25 #include <fstream>
26 #include <stdexcept>
27 
28 using namespace std;
29 using namespace Belle2;
30 
31 // Constructor of Private Ringbuffer
32 RingBuffer::RingBuffer(int size)
33 {
34  openSHM(size);
35  B2DEBUG(32, "RingBuffer initialization done");
36 }
37 
38 // Constructor of Global Ringbuffer with name
39 RingBuffer::RingBuffer(const std::string& name, unsigned int nwords)
40 {
41  // 0. Determine shared memory type
42  if (name != "private") { // Global
43  m_file = true;
44  m_pathname = std::string(std::filesystem::temp_directory_path() / getenv("USER")) + "_RB_" + name;
45  m_pathfd = open(m_pathname.c_str(), O_CREAT | O_EXCL | O_RDWR, 0644);
46  if (m_pathfd > 0) { // a new shared memory file created
47  B2DEBUG(32, "[RingBuffer] Creating a ring buffer with key " << name);
48  m_new = true;
49  } else if (m_pathfd == -1 && errno == EEXIST) { // shm already there
50  B2DEBUG(32, "[RingBuffer] Attaching the ring buffer with key " << name);
51  m_new = false;
52  } else {
53  B2FATAL("RingBuffer: error opening shm file: " << m_pathname);
54  }
55  m_shmkey = ftok(m_pathname.c_str(), 1);
56  m_semkey = ftok(m_pathname.c_str(), 2);
57  } else { // Private
58  B2DEBUG(32, "[RingBuffer] Opening private ring buffer");
59  }
60 
61  openSHM(nwords);
62 
63  if (m_pathfd > 0) {
64  B2DEBUG(32, "First global RingBuffer creation: writing SHM info to file.");
65  char rbufinfo[256];
66  snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_shmid);
67  int is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
68  if (is < 0) perror("write");
69  snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_semid);
70  is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
71  if (is < 0) perror("write");
72  close(m_pathfd);
73  }
74 
75 
76  B2DEBUG(32, "RingBuffer initialization done with shm=" << m_shmid);
77 }
78 
79 RingBuffer::~RingBuffer()
80 {
81  cleanup();
82 }
83 
84 void RingBuffer::openSHM(int nwords)
85 {
86  // 1. Open shared memory
87  unsigned int sizeBytes = nwords * sizeof(int);
88  const auto mode = IPC_CREAT | 0644;
89  m_shmid = shmget(m_shmkey, sizeBytes, mode);
90  if (m_shmid < 0) {
91  unsigned int maxSizeBytes = sizeBytes;
92  ifstream shmax("/proc/sys/kernel/shmmax");
93  if (shmax.good())
94  shmax >> maxSizeBytes;
95  shmax.close();
96 
97  B2WARNING("RingBuffer: shmget(" << sizeBytes << ") failed, limiting to system maximum: " << maxSizeBytes);
98  sizeBytes = maxSizeBytes;
99  nwords = maxSizeBytes / sizeof(int);
100  m_shmid = shmget(m_shmkey, sizeBytes, mode);
101  if (m_shmid < 0) {
102  B2FATAL("RingBuffer: shmget(" << sizeBytes <<
103  ") failed. Most likely the system doesn't allow us to reserve the needed shared memory. Try 'echo 500000000 > /proc/sys/kernel/shmmax' as root to set a higher limit (500MB).");
104  }
105  }
106  m_shmadr = (int*) shmat(m_shmid, nullptr, 0);
107  if (m_shmadr == (int*) - 1) {
108  B2FATAL("RingBuffer: Attaching to shared memory segment via shmat() failed");
109  }
110 
111  // 2. Open Semaphore
112  m_semid = SemaphoreLocker::create(m_semkey);
113  if (m_semid < 0) {
114  cleanup();
115  B2FATAL("Aborting execution because we couldn't create a semaphore (see previous error messages for details).");
116  }
117  SemaphoreLocker locker(m_semid); //prevent simultaneous initialization
118 
119  // 3. Initialize control parameters
120  m_shmsize = nwords;
121  m_bufinfo = reinterpret_cast<RingBufInfo*>(m_shmadr);
122  m_buftop = m_shmadr + sizeof(struct RingBufInfo);
123  if (m_new) {
124  m_bufinfo->size = m_shmsize - sizeof(struct RingBufInfo);
125  m_bufinfo->remain = m_bufinfo->size;
126  m_bufinfo->wptr = 0;
127  m_bufinfo->prevwptr = 0;
128  m_bufinfo->rptr = 0;
129  m_bufinfo->nbuf = 0;
130  m_bufinfo->semid = m_semid;
131  m_bufinfo->nattached = 1;
132  m_bufinfo->nbusy = 0;
133  m_bufinfo->errtype = 0;
134  m_bufinfo->numAttachedTx = -1;
135  m_bufinfo->ninsq = 0;
136  m_bufinfo->nremq = 0;
137  } else {
138  m_bufinfo->nattached++;
139 
140  B2DEBUG(32, "[RingBuffer] check entries = " << m_bufinfo->nbuf);
141  B2DEBUG(32, "[RingBuffer] check size = " << m_bufinfo->size);
142  }
143 
144  m_remq_counter = 0;
145  m_insq_counter = 0;
146 
147  if (m_new) {
148  // Leave id of shm and semaphore in file name
149  m_semshmFileName = std::string(std::filesystem::temp_directory_path() / "SHM") + to_string(m_shmid) + "-SEM" + to_string(
150  m_semid) + "-UNNAMED";
151  int fd = open(m_semshmFileName.c_str(), O_CREAT | O_TRUNC | O_RDWR, 0644);
152  if (fd < 0) {
153  B2WARNING("RingBuffer ID file could not be created.");
154  } else {
155  close(fd);
156  }
157  }
158 
159  B2DEBUG(35, "buftop = " << m_buftop << ", end = " << (m_buftop + m_bufinfo->size));
160 }
161 
162 void RingBuffer::cleanup()
163 {
164  if (m_procIsBusy) {
165  SemaphoreLocker locker(m_semid);
166  m_bufinfo->nbusy--;
167  m_procIsBusy = false;
168  }
169 
170  shmdt(m_shmadr);
171  B2DEBUG(32, "RingBuffer: Cleaning up IPC");
172  if (m_new) {
173  shmctl(m_shmid, IPC_RMID, (struct shmid_ds*) nullptr);
174  SemaphoreLocker::destroy(m_semid);
175  if (m_file) {
176  unlink(m_pathname.c_str());
177  }
178  unlink(m_semshmFileName.c_str());
179  }
180 }
181 
182 void RingBuffer::dump_db()
183 {
184  printf("bufsize=%d, remain=%d, wptr=%d, rptr=%d, nbuf=%d\n",
185  m_bufinfo->size, m_bufinfo->remain,
186  m_bufinfo->wptr, m_bufinfo->rptr, m_bufinfo->nbuf);
187 }
188 
189 int RingBuffer::insq(const int* buf, int size, bool checkTx)
190 {
191  if (size <= 0) {
192  B2FATAL("RingBuffer::insq() failed: invalid buffer size = " << size);
193  }
194  if (m_bufinfo->numAttachedTx == 0 and checkTx) {
195  //safe abort was requested
196  B2WARNING("Number of attached Tx is 0, so I will not go on with the processing.");
197  exit(0);
198  }
199  SemaphoreLocker locker(m_semid);
200  if (m_bufinfo->nbuf == 0) {
201  m_bufinfo->wptr = 0;
202  m_bufinfo->rptr = 0;
203  if (size > m_bufinfo->size + 2) {
204  throw std::runtime_error("[RingBuffer::insq ()] Inserted item (size: " + std::to_string(size) +
205  ") is larger than RingBuffer (size: " + std::to_string(m_bufinfo->size + 2) + ")!");
206  }
207  m_bufinfo->errtype = 0;
208  int* wptr = m_buftop + m_bufinfo->wptr;
209  *wptr = size;
210  *(wptr + 1) = m_bufinfo->wptr + (size + 2);
211  memcpy(wptr + 2, buf, size * sizeof(int));
212  m_bufinfo->prevwptr = m_bufinfo->wptr;
213  m_bufinfo->wptr += (size + 2);
214  m_bufinfo->nbuf++;
215  m_bufinfo->ninsq++;
216  m_insq_counter++;
217  return size;
218  } else if (m_bufinfo->wptr > m_bufinfo->rptr) {
219  if (m_bufinfo->errtype != 4 &&
220  m_bufinfo->errtype != 3 && m_bufinfo->errtype != 0) {
221  B2ERROR("insq: Error in errtype 0; current=" << m_bufinfo->errtype);
222  return -1;
223  }
224  if (m_bufinfo->errtype == 3) {
225  // printf ( "---> errtype is 3, still remaining buffers\n" );
226  B2DEBUG(32, "[RingBuffer] errtype 3");
227  return -1;
228  } else if (m_bufinfo->errtype == 4) {
229  // printf ( "---> errtype returned to 0, wptr=%d, rptr=%d\n",
230  // m_bufinfo->wptr, m_bufinfo->rptr );
231  }
232  m_bufinfo->errtype = 0;
233  if (size + 2 < m_bufinfo->size - m_bufinfo->wptr) { // normal case
234  int* wptr = m_buftop + m_bufinfo->wptr;
235  *wptr = size;
236  *(wptr + 1) = m_bufinfo->wptr + (size + 2);
237  memcpy(wptr + 2, buf, size * sizeof(int));
238  m_bufinfo->prevwptr = m_bufinfo->wptr;
239  m_bufinfo->wptr += (size + 2);
240  m_bufinfo->nbuf++;
241  m_bufinfo->ninsq++;
242  m_insq_counter++;
243  return size;
244  } else {
245  if (m_bufinfo->rptr >= size + 2) { // buffer full and wptr>rptr
246  // this should be dead code but I don't understand it well enough to delete it
247  if (m_bufinfo->errtype != 0) {
248  B2ERROR("insq: Error in errtype 1; current=" << m_bufinfo->errtype);
249  return -1;
250  }
251  m_bufinfo->errtype = 1;
252  int* wptr = m_buftop;
253  memcpy(wptr + 2, buf, size * sizeof(int));
254  *wptr = size;
255  *(wptr + 1) = size + 2;
256  m_bufinfo->wptr = *(wptr + 1);
257  int* prevptr = m_buftop + m_bufinfo->prevwptr;
258  *(prevptr + 1) = 0;
259  m_bufinfo->prevwptr = 0;
260  if (m_bufinfo->nbuf == 0) {
261  m_bufinfo->errtype = 4;
262  m_bufinfo->rptr = 0;
263  }
264  m_bufinfo->nbuf++;
265  // printf ( "insq: no more space, space below rptr; prev=%d, new=%d\n",
266  // m_bufinfo->prevwptr, m_bufinfo->wptr );
267  m_bufinfo->ninsq++;
268  m_insq_counter++;
269  return size;
270  } else {
271  // printf ( "insq: wptr>rptr, no more space, no space below rptr(%d), readbuf=%d\n",
272  // m_bufinfo->rptr, m_bufinfo->readbuf );
273  return -1;
274  }
275  }
276  } else { // wptr < rptr
277  if (m_bufinfo->wptr + size + 2 < m_bufinfo->rptr &&
278  size + 2 < m_bufinfo->size - m_bufinfo->rptr) {
279  if (m_bufinfo->errtype != 1 && m_bufinfo->errtype != 2 &&
280  m_bufinfo->errtype != 3) {
281  B2ERROR("insq: Error in errtype 2; current=" << m_bufinfo->errtype);
282  return -1;
283  }
284  m_bufinfo->errtype = 2;
285  int* wptr = m_buftop + m_bufinfo->wptr;
286  *wptr = size;
287  *(wptr + 1) = m_bufinfo->wptr + (size + 2);
288  memcpy(wptr + 2, buf, size * sizeof(int));
289  m_bufinfo->prevwptr = m_bufinfo->wptr;
290  m_bufinfo->wptr += (size + 2);
291  m_bufinfo->nbuf++;
292  // printf ( "insq: wptr<rptr and enough space below rptr; curr=%d, next=%d, rptr=%d\n", m_bufinfo->prevwptr, m_bufinfo->wptr, m_bufinfo->rptr );
293  if (m_bufinfo->wptr > m_bufinfo->rptr) {
294  B2DEBUG(32, "next pointer will exceed rptr.....");
295  m_bufinfo->errtype = 3;
296  }
297  m_bufinfo->ninsq++;
298  m_insq_counter++;
299  return size;
300  } else {
301  // printf ( "insq: wptr<rptr; no more space below rptr(%d), wptr(%d)\n",
302  // m_bufinfo->rptr, m_bufinfo->wptr );
303  return -1;
304  }
305  }
306 }
307 
308 int RingBuffer::remq(int* buf)
309 {
310  SemaphoreLocker locker(m_semid);
311  if (m_bufinfo->nbuf < 0) {
312  throw std::runtime_error("[RingBuffer::remq ()] number of entries is negative: " + std::to_string(m_bufinfo->nbuf));
313  }
314  if (m_bufinfo->nbuf == 0) {
315  if (m_procIsBusy) {
316  m_bufinfo->nbusy--;
317  m_procIsBusy = false;
318  }
319  return 0;
320  }
321  int* r_ptr = m_buftop + m_bufinfo->rptr;
322  int nw = *r_ptr;
323  if (nw <= 0) {
324  printf("RingBuffer::remq : buffer size = %d, skipped\n", nw);
325  printf("RingBuffer::remq : entries = %d\n", m_bufinfo->nbuf);
326  if (m_procIsBusy) {
327  m_bufinfo->nbusy--;
328  m_procIsBusy = false;
329  }
330  return 0;
331  }
332  if (buf)
333  memcpy(buf, r_ptr + 2, nw * sizeof(int));
334  m_bufinfo->rptr = *(r_ptr + 1);
335  if (m_bufinfo->rptr == 0)
336  m_bufinfo->errtype = 4;
337  m_bufinfo->nbuf--;
338  m_bufinfo->nremq++;
339  m_remq_counter++;
340 
341  if (not m_procIsBusy) {
342  m_bufinfo->nbusy++;
343  m_procIsBusy = true;
344  }
345  return nw;
346 }
347 
348 int RingBuffer::spyq(int* buf) const
349 {
350  SemaphoreLocker locker(m_semid);
351  if (m_bufinfo->nbuf <= 0) {
352  return 0;
353  }
354  int* r_ptr = m_buftop + m_bufinfo->rptr;
355  int nw = *r_ptr;
356  if (nw <= 0) {
357  printf("RingBuffer::spyq : buffer size = %d, skipped\n", nw);
358  printf("RingBuffer::spyq : entries = %d\n", m_bufinfo->nbuf);
359  return 0;
360  }
361  // printf ( "remq : taking buf from %d(%d)\n", m_bufinfo->rptr, nw );
362  // Copy buffer without modifying management parameters.
363  memcpy(buf, r_ptr + 2, nw * sizeof(int));
364  // Exit
365  return nw;
366 }
367 
368 int RingBuffer::numq() const
369 {
370  return m_bufinfo->nbuf;
371 }
372 
373 void RingBuffer::txAttached()
374 {
375  SemaphoreLocker locker(m_semid);
376  if (m_bufinfo->numAttachedTx == -1) //first attach
377  m_bufinfo->numAttachedTx = 0;
378 
379  m_bufinfo->numAttachedTx++;
380 }
381 void RingBuffer::txDetached()
382 {
383  SemaphoreLocker locker(m_semid);
384  m_bufinfo->numAttachedTx--;
385  if (m_bufinfo->numAttachedTx < 0) {
386  m_bufinfo->numAttachedTx = 0;
387  }
388 }
389 void RingBuffer::kill()
390 {
391  m_bufinfo->numAttachedTx = 0;
392  m_bufinfo->nbuf = 0;
393 }
394 bool RingBuffer::isDead() const
395 {
396  SemaphoreLocker locker(m_semid);
397  //NOTE: numAttachedTx == -1 also means we should read data (i.e. initialization pending)
398  return (m_bufinfo->numAttachedTx == 0) and (m_bufinfo->nbuf <= 0);
399 }
400 bool RingBuffer::allRxWaiting() const
401 {
402  SemaphoreLocker locker(m_semid);
403  return (m_bufinfo->nbusy == 0) and (m_bufinfo->nbuf == 0);
404 }
405 
406 int RingBuffer::ninsq() const
407 {
408  return m_bufinfo->ninsq;
409 }
410 
411 int RingBuffer::nremq() const
412 {
413  return m_bufinfo->nremq;
414 }
415 
416 int RingBuffer::insq_counter() const
417 {
418  return m_insq_counter;
419 }
420 
421 int RingBuffer::remq_counter() const
422 {
423  return m_remq_counter;
424 }
425 
426 int RingBuffer::clear()
427 {
428  SemaphoreLocker locker(m_semid);
429  // m_bufinfo->size = m_shmsize - sizeof ( struct RingBufInfo );
430  m_bufinfo->remain = m_bufinfo->size;
431  m_bufinfo->wptr = 0;
432  m_bufinfo->prevwptr = 0;
433  m_bufinfo->rptr = 0;
434  m_bufinfo->nbuf = 0;
435  m_bufinfo->ninsq = 0;
436  m_bufinfo->nremq = 0;
437 
438  return 0;
439 }
440 
441 void RingBuffer::forceClear()
442 {
443  int val = 1;
444  if (semctl(m_semid, 0, SETVAL, val) == -1) { //set 0th semaphore to semval
445  B2ERROR("Initializing semaphore with semctl() failed.");
446  }
447  clear();
448 }
449 
450 int RingBuffer::tryClear()
451 {
452  if (SemaphoreLocker::isLocked(m_semid))
453  return 0;
454 
455  return clear();
456 }
457 
458 int RingBuffer::shmid() const
459 {
460  return m_shmid;
461 }
462 
463 void RingBuffer::dumpInfo() const
464 {
465  SemaphoreLocker locker(m_semid);
466 
467  // Dump control parameters
468  printf("***** Ring Buffer Information ***\n");
469  printf("path = %s\n", m_pathname.c_str());
470  printf("shmsize = %d\n", m_shmsize);
471  printf("[Buffer Info]\n");
472  printf("bufsize = %d\n", m_bufinfo->size);
473  printf("remain = %d\n", m_bufinfo->remain);
474  printf("wptr = %d\n", m_bufinfo->wptr);
475  printf("prevwptr = %d\n", m_bufinfo->prevwptr);
476  printf("rptr = %d\n", m_bufinfo->rptr);
477  printf("nbuf = %d\n", m_bufinfo->nbuf);
478  printf("nattached = %d\n", m_bufinfo->nattached);
479  printf("nbusy = %d\n", m_bufinfo->nbusy);
480  printf("errtype = %d\n", m_bufinfo->errtype);
481  printf("numAttachedTx = %d\n", m_bufinfo->numAttachedTx);
482  printf("ninsq = %d\n", m_bufinfo->ninsq);
483  printf("nremq = %d\n", m_bufinfo->ninsq);
484 }
Handles creation, locking and unlocking of System V semaphores.
Abstract base class for different kinds of events.
Internal metadata structure for RingBuffer.
Definition: RingBuffer.h:21
int size
ring buffer size (integers), minus this header.
Definition: RingBuffer.h:22