Belle II Software  light-2212-foldex
RingBuffer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/RingBuffer.h>
10 #include <framework/pcore/SemaphoreLocker.h>
11 #include <framework/logging/Logger.h>
12 
13 #include <sys/ipc.h>
14 #include <sys/shm.h>
15 
16 #include <cerrno>
17 #include <cstdio>
18 #include <unistd.h>
19 #include <cstring>
20 #include <fcntl.h>
21 #include <cstdlib>
22 #include <sys/sem.h>
23 
24 #include <fstream>
25 #include <stdexcept>
26 
27 using namespace std;
28 using namespace Belle2;
29 
30 // Constructor of Private Ringbuffer
31 RingBuffer::RingBuffer(int size)
32 {
33  openSHM(size);
34  B2DEBUG(32, "RingBuffer initialization done");
35 }
36 
37 // Constructor of Global Ringbuffer with name
38 RingBuffer::RingBuffer(const std::string& name, unsigned int nwords)
39 {
40  // 0. Determine shared memory type
41  if (name != "private") { // Global
42  m_file = true;
43  m_pathname = string("/tmp/") + getenv("USER") + "_RB_" + name;
44  m_pathfd = open(m_pathname.c_str(), O_CREAT | O_EXCL | O_RDWR, 0644);
45  if (m_pathfd > 0) { // a new shared memory file created
46  B2DEBUG(32, "[RingBuffer] Creating a ring buffer with key " << name);
47  m_new = true;
48  } else if (m_pathfd == -1 && errno == EEXIST) { // shm already there
49  B2DEBUG(32, "[RingBuffer] Attaching the ring buffer with key " << name);
50  m_new = false;
51  } else {
52  B2FATAL("RingBuffer: error opening shm file: " << m_pathname);
53  }
54  m_shmkey = ftok(m_pathname.c_str(), 1);
55  m_semkey = ftok(m_pathname.c_str(), 2);
56  } else { // Private
57  B2DEBUG(32, "[RingBuffer] Opening private ring buffer");
58  }
59 
60  openSHM(nwords);
61 
62  if (m_pathfd > 0) {
63  B2DEBUG(32, "First global RingBuffer creation: writing SHM info to file.");
64  char rbufinfo[256];
65  snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_shmid);
66  int is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
67  if (is < 0) perror("write");
68  snprintf(rbufinfo, sizeof(rbufinfo), "%d\n", m_semid);
69  is = write(m_pathfd, rbufinfo, strlen(rbufinfo));
70  if (is < 0) perror("write");
71  close(m_pathfd);
72  }
73 
74 
75  B2DEBUG(32, "RingBuffer initialization done with shm=" << m_shmid);
76 }
77 
78 RingBuffer::~RingBuffer()
79 {
80  cleanup();
81 }
82 
83 void RingBuffer::openSHM(int nwords)
84 {
85  // 1. Open shared memory
86  unsigned int sizeBytes = nwords * sizeof(int);
87  const auto mode = IPC_CREAT | 0644;
88  m_shmid = shmget(m_shmkey, sizeBytes, mode);
89  if (m_shmid < 0) {
90  unsigned int maxSizeBytes = sizeBytes;
91  ifstream shmax("/proc/sys/kernel/shmmax");
92  if (shmax.good())
93  shmax >> maxSizeBytes;
94  shmax.close();
95 
96  B2WARNING("RingBuffer: shmget(" << sizeBytes << ") failed, limiting to system maximum: " << maxSizeBytes);
97  sizeBytes = maxSizeBytes;
98  nwords = maxSizeBytes / sizeof(int);
99  m_shmid = shmget(m_shmkey, sizeBytes, mode);
100  if (m_shmid < 0) {
101  B2FATAL("RingBuffer: shmget(" << sizeBytes <<
102  ") failed. Most likely the system doesn't allow us to reserve the needed shared memory. Try 'echo 500000000 > /proc/sys/kernel/shmmax' as root to set a higher limit (500MB).");
103  }
104  }
105  m_shmadr = (int*) shmat(m_shmid, nullptr, 0);
106  if (m_shmadr == (int*) - 1) {
107  B2FATAL("RingBuffer: Attaching to shared memory segment via shmat() failed");
108  }
109 
110  // 2. Open Semaphore
111  m_semid = SemaphoreLocker::create(m_semkey);
112  if (m_semid < 0) {
113  cleanup();
114  B2FATAL("Aborting execution because we couldn't create a semaphore (see previous error messages for details).");
115  }
116  SemaphoreLocker locker(m_semid); //prevent simultaneous initialization
117 
118  // 3. Initialize control parameters
119  m_shmsize = nwords;
120  m_bufinfo = reinterpret_cast<RingBufInfo*>(m_shmadr);
121  m_buftop = m_shmadr + sizeof(struct RingBufInfo);
122  if (m_new) {
123  m_bufinfo->size = m_shmsize - sizeof(struct RingBufInfo);
124  m_bufinfo->remain = m_bufinfo->size;
125  m_bufinfo->wptr = 0;
126  m_bufinfo->prevwptr = 0;
127  m_bufinfo->rptr = 0;
128  m_bufinfo->nbuf = 0;
129  m_bufinfo->semid = m_semid;
130  m_bufinfo->nattached = 1;
131  m_bufinfo->nbusy = 0;
132  m_bufinfo->errtype = 0;
133  m_bufinfo->numAttachedTx = -1;
134  m_bufinfo->ninsq = 0;
135  m_bufinfo->nremq = 0;
136  } else {
137  m_bufinfo->nattached++;
138 
139  B2DEBUG(32, "[RingBuffer] check entries = " << m_bufinfo->nbuf);
140  B2DEBUG(32, "[RingBuffer] check size = " << m_bufinfo->size);
141  }
142 
143  m_remq_counter = 0;
144  m_insq_counter = 0;
145 
146  if (m_new) {
147  // Leave id of shm and semaphore in file name
148  m_semshmFileName = "/tmp/SHM" + to_string(m_shmid) + "-SEM" + to_string(m_semid) + "-UNNAMED";
149  int fd = open(m_semshmFileName.c_str(), O_CREAT | O_TRUNC | O_RDWR, 0644);
150  if (fd < 0) {
151  B2WARNING("RingBuffer ID file could not be created.");
152  } else {
153  close(fd);
154  }
155  }
156 
157  B2DEBUG(35, "buftop = " << m_buftop << ", end = " << (m_buftop + m_bufinfo->size));
158 }
159 
160 void RingBuffer::cleanup()
161 {
162  if (m_procIsBusy) {
163  SemaphoreLocker locker(m_semid);
164  m_bufinfo->nbusy--;
165  m_procIsBusy = false;
166  }
167 
168  shmdt(m_shmadr);
169  B2DEBUG(32, "RingBuffer: Cleaning up IPC");
170  if (m_new) {
171  shmctl(m_shmid, IPC_RMID, (struct shmid_ds*) nullptr);
172  SemaphoreLocker::destroy(m_semid);
173  if (m_file) {
174  unlink(m_pathname.c_str());
175  }
176  unlink(m_semshmFileName.c_str());
177  }
178 }
179 
180 void RingBuffer::dump_db()
181 {
182  printf("bufsize=%d, remain=%d, wptr=%d, rptr=%d, nbuf=%d\n",
183  m_bufinfo->size, m_bufinfo->remain,
184  m_bufinfo->wptr, m_bufinfo->rptr, m_bufinfo->nbuf);
185 }
186 
187 int RingBuffer::insq(const int* buf, int size, bool checkTx)
188 {
189  if (size <= 0) {
190  B2FATAL("RingBuffer::insq() failed: invalid buffer size = " << size);
191  }
192  if (m_bufinfo->numAttachedTx == 0 and checkTx) {
193  //safe abort was requested
194  B2WARNING("Number of attached Tx is 0, so I will not go on with the processing.");
195  exit(0);
196  }
197  SemaphoreLocker locker(m_semid);
198  if (m_bufinfo->nbuf == 0) {
199  m_bufinfo->wptr = 0;
200  m_bufinfo->rptr = 0;
201  if (size > m_bufinfo->size + 2) {
202  throw std::runtime_error("[RingBuffer::insq ()] Inserted item (size: " + std::to_string(size) +
203  ") is larger than RingBuffer (size: " + std::to_string(m_bufinfo->size + 2) + ")!");
204  }
205  m_bufinfo->errtype = 0;
206  int* wptr = m_buftop + m_bufinfo->wptr;
207  *wptr = size;
208  *(wptr + 1) = m_bufinfo->wptr + (size + 2);
209  memcpy(wptr + 2, buf, size * sizeof(int));
210  m_bufinfo->prevwptr = m_bufinfo->wptr;
211  m_bufinfo->wptr += (size + 2);
212  m_bufinfo->nbuf++;
213  m_bufinfo->ninsq++;
214  m_insq_counter++;
215  return size;
216  } else if (m_bufinfo->wptr > m_bufinfo->rptr) {
217  if (m_bufinfo->errtype != 4 &&
218  m_bufinfo->errtype != 3 && m_bufinfo->errtype != 0) {
219  B2ERROR("insq: Error in errtype 0; current=" << m_bufinfo->errtype);
220  return -1;
221  }
222  if (m_bufinfo->errtype == 3) {
223  // printf ( "---> errtype is 3, still remaining buffers\n" );
224  B2DEBUG(32, "[RingBuffer] errtype 3");
225  return -1;
226  } else if (m_bufinfo->errtype == 4) {
227  // printf ( "---> errtype returned to 0, wptr=%d, rptr=%d\n",
228  // m_bufinfo->wptr, m_bufinfo->rptr );
229  }
230  m_bufinfo->errtype = 0;
231  if (size + 2 < m_bufinfo->size - m_bufinfo->wptr) { // normal case
232  int* wptr = m_buftop + m_bufinfo->wptr;
233  *wptr = size;
234  *(wptr + 1) = m_bufinfo->wptr + (size + 2);
235  memcpy(wptr + 2, buf, size * sizeof(int));
236  m_bufinfo->prevwptr = m_bufinfo->wptr;
237  m_bufinfo->wptr += (size + 2);
238  m_bufinfo->nbuf++;
239  m_bufinfo->ninsq++;
240  m_insq_counter++;
241  return size;
242  } else {
243  if (m_bufinfo->rptr >= size + 2) { // buffer full and wptr>rptr
244  // this should be dead code but I don't understand it well enough to delete it
245  if (m_bufinfo->errtype != 0) {
246  B2ERROR("insq: Error in errtype 1; current=" << m_bufinfo->errtype);
247  return -1;
248  }
249  m_bufinfo->errtype = 1;
250  int* wptr = m_buftop;
251  memcpy(wptr + 2, buf, size * sizeof(int));
252  *wptr = size;
253  *(wptr + 1) = size + 2;
254  m_bufinfo->wptr = *(wptr + 1);
255  int* prevptr = m_buftop + m_bufinfo->prevwptr;
256  *(prevptr + 1) = 0;
257  m_bufinfo->prevwptr = 0;
258  if (m_bufinfo->nbuf == 0) {
259  m_bufinfo->errtype = 4;
260  m_bufinfo->rptr = 0;
261  }
262  m_bufinfo->nbuf++;
263  // printf ( "insq: no more space, space below rptr; prev=%d, new=%d\n",
264  // m_bufinfo->prevwptr, m_bufinfo->wptr );
265  m_bufinfo->ninsq++;
266  m_insq_counter++;
267  return size;
268  } else {
269  // printf ( "insq: wptr>rptr, no more space, no space below rptr(%d), readbuf=%d\n",
270  // m_bufinfo->rptr, m_bufinfo->readbuf );
271  return -1;
272  }
273  }
274  } else { // wptr < rptr
275  if (m_bufinfo->wptr + size + 2 < m_bufinfo->rptr &&
276  size + 2 < m_bufinfo->size - m_bufinfo->rptr) {
277  if (m_bufinfo->errtype != 1 && m_bufinfo->errtype != 2 &&
278  m_bufinfo->errtype != 3) {
279  B2ERROR("insq: Error in errtype 2; current=" << m_bufinfo->errtype);
280  return -1;
281  }
282  m_bufinfo->errtype = 2;
283  int* wptr = m_buftop + m_bufinfo->wptr;
284  *wptr = size;
285  *(wptr + 1) = m_bufinfo->wptr + (size + 2);
286  memcpy(wptr + 2, buf, size * sizeof(int));
287  m_bufinfo->prevwptr = m_bufinfo->wptr;
288  m_bufinfo->wptr += (size + 2);
289  m_bufinfo->nbuf++;
290  // printf ( "insq: wptr<rptr and enough space below rptr; curr=%d, next=%d, rptr=%d\n", m_bufinfo->prevwptr, m_bufinfo->wptr, m_bufinfo->rptr );
291  if (m_bufinfo->wptr > m_bufinfo->rptr) {
292  B2DEBUG(32, "next pointer will exceed rptr.....");
293  m_bufinfo->errtype = 3;
294  }
295  m_bufinfo->ninsq++;
296  m_insq_counter++;
297  return size;
298  } else {
299  // printf ( "insq: wptr<rptr; no more space below rptr(%d), wptr(%d)\n",
300  // m_bufinfo->rptr, m_bufinfo->wptr );
301  return -1;
302  }
303  }
304 }
305 
306 int RingBuffer::remq(int* buf)
307 {
308  SemaphoreLocker locker(m_semid);
309  if (m_bufinfo->nbuf < 0) {
310  throw std::runtime_error("[RingBuffer::remq ()] number of entries is negative: " + std::to_string(m_bufinfo->nbuf));
311  }
312  if (m_bufinfo->nbuf == 0) {
313  if (m_procIsBusy) {
314  m_bufinfo->nbusy--;
315  m_procIsBusy = false;
316  }
317  return 0;
318  }
319  int* r_ptr = m_buftop + m_bufinfo->rptr;
320  int nw = *r_ptr;
321  if (nw <= 0) {
322  printf("RingBuffer::remq : buffer size = %d, skipped\n", nw);
323  printf("RingBuffer::remq : entries = %d\n", m_bufinfo->nbuf);
324  if (m_procIsBusy) {
325  m_bufinfo->nbusy--;
326  m_procIsBusy = false;
327  }
328  return 0;
329  }
330  if (buf)
331  memcpy(buf, r_ptr + 2, nw * sizeof(int));
332  m_bufinfo->rptr = *(r_ptr + 1);
333  if (m_bufinfo->rptr == 0)
334  m_bufinfo->errtype = 4;
335  m_bufinfo->nbuf--;
336  m_bufinfo->nremq++;
337  m_remq_counter++;
338 
339  if (not m_procIsBusy) {
340  m_bufinfo->nbusy++;
341  m_procIsBusy = true;
342  }
343  return nw;
344 }
345 
346 int RingBuffer::spyq(int* buf) const
347 {
348  SemaphoreLocker locker(m_semid);
349  if (m_bufinfo->nbuf <= 0) {
350  return 0;
351  }
352  int* r_ptr = m_buftop + m_bufinfo->rptr;
353  int nw = *r_ptr;
354  if (nw <= 0) {
355  printf("RingBuffer::spyq : buffer size = %d, skipped\n", nw);
356  printf("RingBuffer::spyq : entries = %d\n", m_bufinfo->nbuf);
357  return 0;
358  }
359  // printf ( "remq : taking buf from %d(%d)\n", m_bufinfo->rptr, nw );
360  // Copy buffer without modifying management parameters.
361  memcpy(buf, r_ptr + 2, nw * sizeof(int));
362  // Exit
363  return nw;
364 }
365 
366 int RingBuffer::numq() const
367 {
368  return m_bufinfo->nbuf;
369 }
370 
371 void RingBuffer::txAttached()
372 {
373  SemaphoreLocker locker(m_semid);
374  if (m_bufinfo->numAttachedTx == -1) //first attach
375  m_bufinfo->numAttachedTx = 0;
376 
377  m_bufinfo->numAttachedTx++;
378 }
379 void RingBuffer::txDetached()
380 {
381  SemaphoreLocker locker(m_semid);
382  m_bufinfo->numAttachedTx--;
383  if (m_bufinfo->numAttachedTx < 0) {
384  m_bufinfo->numAttachedTx = 0;
385  }
386 }
387 void RingBuffer::kill()
388 {
389  m_bufinfo->numAttachedTx = 0;
390  m_bufinfo->nbuf = 0;
391 }
392 bool RingBuffer::isDead() const
393 {
394  SemaphoreLocker locker(m_semid);
395  //NOTE: numAttachedTx == -1 also means we should read data (i.e. initialization pending)
396  return (m_bufinfo->numAttachedTx == 0) and (m_bufinfo->nbuf <= 0);
397 }
398 bool RingBuffer::allRxWaiting() const
399 {
400  SemaphoreLocker locker(m_semid);
401  return (m_bufinfo->nbusy == 0) and (m_bufinfo->nbuf == 0);
402 }
403 
404 int RingBuffer::ninsq() const
405 {
406  return m_bufinfo->ninsq;
407 }
408 
409 int RingBuffer::nremq() const
410 {
411  return m_bufinfo->nremq;
412 }
413 
414 int RingBuffer::insq_counter() const
415 {
416  return m_insq_counter;
417 }
418 
419 int RingBuffer::remq_counter() const
420 {
421  return m_remq_counter;
422 }
423 
424 int RingBuffer::clear()
425 {
426  SemaphoreLocker locker(m_semid);
427  // m_bufinfo->size = m_shmsize - sizeof ( struct RingBufInfo );
428  m_bufinfo->remain = m_bufinfo->size;
429  m_bufinfo->wptr = 0;
430  m_bufinfo->prevwptr = 0;
431  m_bufinfo->rptr = 0;
432  m_bufinfo->nbuf = 0;
433  m_bufinfo->ninsq = 0;
434  m_bufinfo->nremq = 0;
435 
436  return 0;
437 }
438 
439 void RingBuffer::forceClear()
440 {
441  int val = 1;
442  if (semctl(m_semid, 0, SETVAL, val) == -1) { //set 0th semaphore to semval
443  B2ERROR("Initializing semaphore with semctl() failed.");
444  }
445  clear();
446 }
447 
448 int RingBuffer::tryClear()
449 {
450  if (SemaphoreLocker::isLocked(m_semid))
451  return 0;
452 
453  return clear();
454 }
455 
456 int RingBuffer::shmid() const
457 {
458  return m_shmid;
459 }
460 
461 void RingBuffer::dumpInfo() const
462 {
463  SemaphoreLocker locker(m_semid);
464 
465  // Dump control parameters
466  printf("***** Ring Buffer Information ***\n");
467  printf("path = %s\n", m_pathname.c_str());
468  printf("shmsize = %d\n", m_shmsize);
469  printf("[Buffer Info]\n");
470  printf("bufsize = %d\n", m_bufinfo->size);
471  printf("remain = %d\n", m_bufinfo->remain);
472  printf("wptr = %d\n", m_bufinfo->wptr);
473  printf("prevwptr = %d\n", m_bufinfo->prevwptr);
474  printf("rptr = %d\n", m_bufinfo->rptr);
475  printf("nbuf = %d\n", m_bufinfo->nbuf);
476  printf("nattached = %d\n", m_bufinfo->nattached);
477  printf("nbusy = %d\n", m_bufinfo->nbusy);
478  printf("errtype = %d\n", m_bufinfo->errtype);
479  printf("numAttachedTx = %d\n", m_bufinfo->numAttachedTx);
480  printf("ninsq = %d\n", m_bufinfo->ninsq);
481  printf("nremq = %d\n", m_bufinfo->ninsq);
482 }
Handles creation, locking and unlocking of System V semaphores.
Abstract base class for different kinds of events.
Definition: ClusterUtils.h:23
Internal metadata structure for RingBuffer.
Definition: RingBuffer.h:21
int size
ring buffer size (integers), minus this header.
Definition: RingBuffer.h:22