Belle II Software  release-06-02-00
SharedEventBuffer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include "daq/storage/SharedEventBuffer.h"
9 
10 #include <daq/slc/system/LogFile.h>
11 
12 #include <cstring>
13 #include <cstdio>
14 
15 using namespace Belle2;
16 
17 size_t SharedEventBuffer::size() throw()
18 {
19  return m_mutex.size() + m_cond.size() +
20  sizeof(Header) + sizeof(int) * (m_nword);
21 }
22 
23 bool SharedEventBuffer::open(const std::string& nodename,
24  size_t nword, bool recreate)
25 {
26  m_nword = nword;
27  std::string username = getenv("USER");
28  m_path = "/storage_info_" + username + "_" + nodename;
29  LogFile::debug("%s", m_path.c_str());
30  //if (recreate) SharedMemory::unlink(m_path);
31  if (!m_memory.open(m_path, size())) {
32  perror("shm_open");
33  LogFile::fatal("Failed to open %s", m_path.c_str());
34  return false;
35  }
36  char* buf = (char*) m_memory.map(0, size());
37  if (buf == NULL) {
38  return false;
39  }
40  m_mutex = MMutex(buf);
41  buf += m_mutex.size();
42  m_cond = MCond(buf);
43  buf += m_cond.size();
44  m_header = (Header*)buf;
45  buf += sizeof(Header);
46  m_buf = (int*)buf;
47  if (recreate) init();
48  return true;
49 }
50 
51 bool SharedEventBuffer::init()
52 {
53  if (m_buf == NULL) return false;
54  m_mutex.init();
55  m_cond.init();
56  memset(m_header, 0, sizeof(Header));
57  memset(m_buf, 0, sizeof(int) * m_nword);
58  //for (unsigned long long i = 0; i < m_nword; i++) {
59  // m_buf[i] = 0;
60  //}
61  return true;
62 }
63 
64 void SharedEventBuffer::clear()
65 {
66  if (m_buf == NULL) return;
67  m_mutex.lock();
68  memset(m_header, 0, sizeof(Header));
69  for (unsigned long long i = 0; i < m_nword; i++) {
70  m_buf[i] = 0;
71  }
72  m_mutex.unlock();
73 }
74 
75 bool SharedEventBuffer::close()
76 {
77  m_memory.close();
78  return true;
79 }
80 
81 bool SharedEventBuffer::unlink()
82 {
83  m_memory.close();
84  m_memory.unlink();
85  return true;
86 }
87 
88 bool SharedEventBuffer::lock() throw()
89 {
90  if (m_buf == NULL) return false;
91  return m_mutex.lock();
92 }
93 
94 bool SharedEventBuffer::unlock() throw()
95 {
96  if (m_buf == NULL) return false;
97  return m_mutex.unlock();
98 }
99 
100 bool SharedEventBuffer::wait() throw()
101 {
102  if (m_buf == NULL) return false;
103  return m_cond.wait(m_mutex);
104 }
105 
106 bool SharedEventBuffer::wait(int time) throw()
107 {
108  if (m_buf == NULL) return false;
109  return m_cond.wait(m_mutex, time, 0);
110 }
111 
112 bool SharedEventBuffer::notify() throw()
113 {
114  if (m_buf == NULL) return false;
115  return m_cond.broadcast();
116 }
117 
118 bool SharedEventBuffer::isWritable(int nword) throw()
119 {
120  if (m_buf == NULL) return false;
121  m_mutex.lock();
122  bool writable = m_header->nword_in - m_header->nword_out < m_nword - (nword + 1);
123  m_mutex.unlock();
124  return writable;
125 }
126 
127 bool SharedEventBuffer::isReadable(int nword) throw()
128 {
129  if (m_buf == NULL) return false;
130  m_mutex.lock();
131  bool readable = m_header->nword_in - m_header->nword_out >= m_nword - (nword + 1);
132  m_mutex.unlock();
133  return readable;
134 
135 }
136 
137 unsigned int SharedEventBuffer::write(const int* buf, unsigned int nword,
138  bool fouce, unsigned int serial, bool unlocked)
139 {
140  if (m_buf == NULL) return 0;
141  if (nword == 0) return 0;
142  if (nword > m_nword) return -1;
143  if (!unlocked) m_mutex.lock();
144  m_header->nwriter++;
145  while (!fouce && m_header->nreader > 0) {
146  m_cond.wait(m_mutex);
147  }
148  unsigned int i_w = 0;
149  unsigned int i_r = 0;
150  while (true) {
151  i_w = m_header->nword_in % m_nword;
152  i_r = m_header->nword_out % m_nword;
153  if ((serial == 0 || serial - 1 == m_header->count_in) &&
154  m_header->nword_in - m_header->nword_out < m_nword - (nword + 1)) {
155  if (i_w >= i_r) {
156  unsigned int count = m_nword - i_w;
157  if (nword + 1 < count) {
158  m_buf[i_w] = nword;
159  memcpy((m_buf + i_w + 1), buf, sizeof(int) * nword);
160  } else {
161  m_buf[i_w] = nword;
162  memcpy((m_buf + i_w + 1), buf, sizeof(int) * count);
163  if (nword >= count)
164  memcpy(m_buf, buf + count, sizeof(int) * (nword - count));
165  }
166  } else {
167  m_buf[i_w] = nword;
168  memcpy((m_buf + i_w + 1), buf, sizeof(int) * nword);
169  }
170  break;
171  }
172  m_header->nwriter--;
173  m_cond.wait(m_mutex);
174  m_header->nwriter++;
175  }
176  m_header->nword_in += nword + 1;
177  unsigned int count = ++m_header->count_in;
178  m_header->nwriter--;
179  m_cond.broadcast();
180  if (!unlocked) m_mutex.unlock();
181  return count;
182 }
183 
184 unsigned int SharedEventBuffer::read(int* buf, bool fouce, bool unlocked,
186 {
187  if (m_buf == NULL) return 0;
188  if (!unlocked) m_mutex.lock();
189  m_header->nreader++;
190  while (!fouce && m_header->nwriter > 0) {
191  m_cond.wait(m_mutex);
192  }
193  unsigned int i_w = 0;
194  unsigned int i_r = 0;
195  unsigned int nword = 0;
196  while (true) {
197  i_w = m_header->nword_in % m_nword;
198  i_r = m_header->nword_out % m_nword;
199  nword = m_buf[i_r];
200  if (nword > 0) {
201  if (m_header->nword_in - m_header->nword_out >= (nword + 1)) {
202  if (i_w > i_r) {
203  memcpy(buf, (m_buf + i_r + 1), sizeof(int) * nword);
204  break;
205  } else if (i_w < i_r) {
206  if (m_nword - i_r > nword) {
207  memcpy(buf, (m_buf + i_r + 1), sizeof(int) * nword);
208  break;
209  } else {
210  unsigned int count = m_nword - i_r;
211  memcpy(buf, (m_buf + i_r + 1), sizeof(int) * count);
212  if (nword > count) {
213  memcpy(buf + count, m_buf, sizeof(int) * (nword - count));
214  }
215  break;
216  }
217  }
218  }
219  }
220  m_header->nreader--;
221  m_cond.wait(m_mutex);
222  m_header->nreader++;
223  }
224  m_header->nword_out += nword + 1;
225  unsigned int count = ++m_header->count_out;
226  m_header->nreader--;
227  if (hdr != NULL) {
228  memcpy(hdr, m_header, sizeof(SharedEventBuffer::Header));
229  }
230  m_cond.broadcast();
231  if (!unlocked) m_mutex.unlock();
232  return count;
233 }
Abstract base class for different kinds of events.