Belle II Software  release-05-01-25
SharedEventBuffer.cc
1 #include "daq/storage/SharedEventBuffer.h"
2 
3 #include <daq/slc/system/LogFile.h>
4 
5 #include <cstring>
6 #include <cstdio>
7 
8 using namespace Belle2;
9 
10 size_t SharedEventBuffer::size() throw()
11 {
12  return m_mutex.size() + m_cond.size() +
13  sizeof(Header) + sizeof(int) * (m_nword);
14 }
15 
16 bool SharedEventBuffer::open(const std::string& nodename,
17  size_t nword, bool recreate)
18 {
19  m_nword = nword;
20  std::string username = getenv("USER");
21  m_path = "/storage_info_" + username + "_" + nodename;
22  LogFile::debug("%s", m_path.c_str());
23  //if (recreate) SharedMemory::unlink(m_path);
24  if (!m_memory.open(m_path, size())) {
25  perror("shm_open");
26  LogFile::fatal("Failed to open %s", m_path.c_str());
27  return false;
28  }
29  char* buf = (char*) m_memory.map(0, size());
30  if (buf == NULL) {
31  return false;
32  }
33  m_mutex = MMutex(buf);
34  buf += m_mutex.size();
35  m_cond = MCond(buf);
36  buf += m_cond.size();
37  m_header = (Header*)buf;
38  buf += sizeof(Header);
39  m_buf = (int*)buf;
40  if (recreate) init();
41  return true;
42 }
43 
44 bool SharedEventBuffer::init()
45 {
46  if (m_buf == NULL) return false;
47  m_mutex.init();
48  m_cond.init();
49  memset(m_header, 0, sizeof(Header));
50  memset(m_buf, 0, sizeof(int) * m_nword);
51  //for (unsigned long long i = 0; i < m_nword; i++) {
52  // m_buf[i] = 0;
53  //}
54  return true;
55 }
56 
57 void SharedEventBuffer::clear()
58 {
59  if (m_buf == NULL) return;
60  m_mutex.lock();
61  memset(m_header, 0, sizeof(Header));
62  for (unsigned long long i = 0; i < m_nword; i++) {
63  m_buf[i] = 0;
64  }
65  m_mutex.unlock();
66 }
67 
68 bool SharedEventBuffer::close()
69 {
70  m_memory.close();
71  return true;
72 }
73 
74 bool SharedEventBuffer::unlink()
75 {
76  m_memory.close();
77  m_memory.unlink();
78  return true;
79 }
80 
81 bool SharedEventBuffer::lock() throw()
82 {
83  if (m_buf == NULL) return false;
84  return m_mutex.lock();
85 }
86 
87 bool SharedEventBuffer::unlock() throw()
88 {
89  if (m_buf == NULL) return false;
90  return m_mutex.unlock();
91 }
92 
93 bool SharedEventBuffer::wait() throw()
94 {
95  if (m_buf == NULL) return false;
96  return m_cond.wait(m_mutex);
97 }
98 
99 bool SharedEventBuffer::wait(int time) throw()
100 {
101  if (m_buf == NULL) return false;
102  return m_cond.wait(m_mutex, time, 0);
103 }
104 
105 bool SharedEventBuffer::notify() throw()
106 {
107  if (m_buf == NULL) return false;
108  return m_cond.broadcast();
109 }
110 
111 bool SharedEventBuffer::isWritable(int nword) throw()
112 {
113  if (m_buf == NULL) return false;
114  m_mutex.lock();
115  bool writable = m_header->nword_in - m_header->nword_out < m_nword - (nword + 1);
116  m_mutex.unlock();
117  return writable;
118 }
119 
120 bool SharedEventBuffer::isReadable(int nword) throw()
121 {
122  if (m_buf == NULL) return false;
123  m_mutex.lock();
124  bool readable = m_header->nword_in - m_header->nword_out >= m_nword - (nword + 1);
125  m_mutex.unlock();
126  return readable;
127 
128 }
129 
130 unsigned int SharedEventBuffer::write(const int* buf, unsigned int nword,
131  bool fouce, unsigned int serial, bool unlocked)
132 {
133  if (m_buf == NULL) return 0;
134  if (nword == 0) return 0;
135  if (nword > m_nword) return -1;
136  if (!unlocked) m_mutex.lock();
137  m_header->nwriter++;
138  while (!fouce && m_header->nreader > 0) {
139  m_cond.wait(m_mutex);
140  }
141  unsigned int i_w = 0;
142  unsigned int i_r = 0;
143  while (true) {
144  i_w = m_header->nword_in % m_nword;
145  i_r = m_header->nword_out % m_nword;
146  if ((serial == 0 || serial - 1 == m_header->count_in) &&
147  m_header->nword_in - m_header->nword_out < m_nword - (nword + 1)) {
148  if (i_w >= i_r) {
149  unsigned int count = m_nword - i_w;
150  if (nword + 1 < count) {
151  m_buf[i_w] = nword;
152  memcpy((m_buf + i_w + 1), buf, sizeof(int) * nword);
153  } else {
154  m_buf[i_w] = nword;
155  memcpy((m_buf + i_w + 1), buf, sizeof(int) * count);
156  if (nword >= count)
157  memcpy(m_buf, buf + count, sizeof(int) * (nword - count));
158  }
159  } else {
160  m_buf[i_w] = nword;
161  memcpy((m_buf + i_w + 1), buf, sizeof(int) * nword);
162  }
163  break;
164  }
165  m_header->nwriter--;
166  m_cond.wait(m_mutex);
167  m_header->nwriter++;
168  }
169  m_header->nword_in += nword + 1;
170  unsigned int count = ++m_header->count_in;
171  m_header->nwriter--;
172  m_cond.broadcast();
173  if (!unlocked) m_mutex.unlock();
174  return count;
175 }
176 
177 unsigned int SharedEventBuffer::read(int* buf, bool fouce, bool unlocked,
179 {
180  if (m_buf == NULL) return 0;
181  if (!unlocked) m_mutex.lock();
182  m_header->nreader++;
183  while (!fouce && m_header->nwriter > 0) {
184  m_cond.wait(m_mutex);
185  }
186  unsigned int i_w = 0;
187  unsigned int i_r = 0;
188  unsigned int nword = 0;
189  while (true) {
190  i_w = m_header->nword_in % m_nword;
191  i_r = m_header->nword_out % m_nword;
192  nword = m_buf[i_r];
193  if (nword > 0) {
194  if (m_header->nword_in - m_header->nword_out >= (nword + 1)) {
195  if (i_w > i_r) {
196  memcpy(buf, (m_buf + i_r + 1), sizeof(int) * nword);
197  break;
198  } else if (i_w < i_r) {
199  if (m_nword - i_r > nword) {
200  memcpy(buf, (m_buf + i_r + 1), sizeof(int) * nword);
201  break;
202  } else {
203  unsigned int count = m_nword - i_r;
204  memcpy(buf, (m_buf + i_r + 1), sizeof(int) * count);
205  if (nword > count) {
206  memcpy(buf + count, m_buf, sizeof(int) * (nword - count));
207  }
208  break;
209  }
210  }
211  }
212  }
213  m_header->nreader--;
214  m_cond.wait(m_mutex);
215  m_header->nreader++;
216  }
217  m_header->nword_out += nword + 1;
218  unsigned int count = ++m_header->count_out;
219  m_header->nreader--;
220  if (hdr != NULL) {
221  memcpy(hdr, m_header, sizeof(SharedEventBuffer::Header));
222  }
223  m_cond.broadcast();
224  if (!unlocked) m_mutex.unlock();
225  return count;
226 }
Belle2::MMutex
Definition: MMutex.h:19
Belle2::SharedEventBuffer::Header
Definition: SharedEventBuffer.h:17
Belle2::MCond
Definition: MCond.h:14
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19