Belle II Software  release-08-01-10
SharedEventBuffer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include "daq/storage/SharedEventBuffer.h"
9 
10 #include <daq/slc/system/LogFile.h>
11 
12 #include <cstring>
13 #include <cstdio>
14 
15 using namespace Belle2;
16 
17 size_t SharedEventBuffer::size() throw()
18 {
19  return m_mutex.size() + m_cond.size() +
20  sizeof(Header) + sizeof(int) * (m_nword);
21 }
22 
23 bool SharedEventBuffer::open(const std::string& nodename,
24  size_t nword, bool recreate)
25 {
26  m_nword = nword;
27  std::string username = getenv("USER");
28  m_path = "/storage_info_" + username + "_" + nodename;
29  LogFile::debug("%s", m_path.c_str());
30  //if (recreate) SharedMemory::unlink(m_path);
31  if (!m_memory.open(m_path, size())) {
32  perror("shm_open");
33  LogFile::fatal("Failed to open %s", m_path.c_str());
34  return false;
35  }
36  char* buf = (char*) m_memory.map(0, size());
37  if (buf == NULL) {
38  return false;
39  }
40  m_mutex = MMutex(buf);
41  buf += m_mutex.size();
42  m_cond = MCond(buf);
43  buf += m_cond.size();
44  m_header = reinterpret_cast<Header*>(buf);
45  buf += sizeof(Header);
46  m_buf = (int*)buf;
47  if (recreate) init();
48  return true;
49 }
50 
51 bool SharedEventBuffer::init()
52 {
53  if (m_buf == NULL) return false;
54  m_mutex.init();
55  m_cond.init();
56  memset(m_header, 0, sizeof(Header));
57  memset(m_buf, 0, sizeof(int) * m_nword);
58  //for (unsigned long long i = 0; i < m_nword; i++) {
59  // m_buf[i] = 0;
60  //}
61  return true;
62 }
63 
64 void SharedEventBuffer::clear()
65 {
66  if (m_buf == NULL) return;
67  m_mutex.lock();
68  memset(m_header, 0, sizeof(Header));
69  for (unsigned long long i = 0; i < m_nword; i++) {
70  m_buf[i] = 0;
71  }
72  m_mutex.unlock();
73 }
74 
75 bool SharedEventBuffer::close()
76 {
77  m_memory.close();
78  return true;
79 }
80 
81 bool SharedEventBuffer::unlink()
82 {
83  m_memory.close();
84  m_memory.unlink();
85  return true;
86 }
87 
88 bool SharedEventBuffer::lock() throw()
89 {
90  if (m_buf == NULL) return false;
91  return m_mutex.lock();
92 }
93 
94 bool SharedEventBuffer::unlock() throw()
95 {
96  if (m_buf == NULL) return false;
97  return m_mutex.unlock();
98 }
99 
100 bool SharedEventBuffer::wait() throw()
101 {
102  if (m_buf == NULL) return false;
103  return m_cond.wait(m_mutex);
104 }
105 
106 bool SharedEventBuffer::wait(int time) throw()
107 {
108  if (m_buf == NULL) return false;
109  return m_cond.wait(m_mutex, time, 0);
110 }
111 
112 bool SharedEventBuffer::notify() throw()
113 {
114  if (m_buf == NULL) return false;
115  return m_cond.broadcast();
116 }
117 
118 bool SharedEventBuffer::isWritable(int nword) throw()
119 {
120  if (m_buf == NULL) return false;
121  m_mutex.lock();
122  bool writable = m_header->nword_in - m_header->nword_out < m_nword - (nword + 1);
123  m_mutex.unlock();
124  return writable;
125 }
126 
127 bool SharedEventBuffer::isReadable(int nword) throw()
128 {
129  if (m_buf == NULL) return false;
130  m_mutex.lock();
131  bool readable = m_header->nword_in - m_header->nword_out >= m_nword - (nword + 1);
132  m_mutex.unlock();
133  return readable;
134 
135 }
136 
137 unsigned int SharedEventBuffer::write(const int* buf, unsigned int nword,
138  bool fouce, unsigned int serial, bool unlocked)
139 {
140  if (m_buf == NULL) return 0;
141  if (nword == 0) return 0;
142  if (nword > m_nword) return -1;
143  if (!unlocked) m_mutex.lock();
144  m_header->nwriter++;
145  while (!fouce && m_header->nreader > 0) {
146  m_cond.wait(m_mutex);
147  }
148 
149  while (true) {
150  unsigned int i_w = m_header->nword_in % m_nword;
151  unsigned int i_r = m_header->nword_out % m_nword;
152  if ((serial == 0 || serial - 1 == m_header->count_in) &&
153  m_header->nword_in - m_header->nword_out < m_nword - (nword + 1)) {
154  if (i_w >= i_r) {
155  unsigned int count = m_nword - i_w;
156  if (nword + 1 < count) {
157  m_buf[i_w] = nword;
158  memcpy((m_buf + i_w + 1), buf, sizeof(int) * nword);
159  } else {
160  m_buf[i_w] = nword;
161  memcpy((m_buf + i_w + 1), buf, sizeof(int) * count);
162  if (nword >= count)
163  memcpy(m_buf, buf + count, sizeof(int) * (nword - count));
164  }
165  } else {
166  m_buf[i_w] = nword;
167  memcpy((m_buf + i_w + 1), buf, sizeof(int) * nword);
168  }
169  break;
170  }
171  m_header->nwriter--;
172  m_cond.wait(m_mutex);
173  m_header->nwriter++;
174  }
175  m_header->nword_in += nword + 1;
176  unsigned int count = ++m_header->count_in;
177  m_header->nwriter--;
178  m_cond.broadcast();
179  if (!unlocked) m_mutex.unlock();
180  return count;
181 }
182 
183 unsigned int SharedEventBuffer::read(int* buf, bool fouce, bool unlocked,
185 {
186  if (m_buf == NULL) return 0;
187  if (!unlocked) m_mutex.lock();
188  m_header->nreader++;
189  while (!fouce && m_header->nwriter > 0) {
190  m_cond.wait(m_mutex);
191  }
192 
193  unsigned int nword = 0;
194  while (true) {
195  unsigned int i_w = m_header->nword_in % m_nword;
196  unsigned int i_r = m_header->nword_out % m_nword;
197  nword = m_buf[i_r];
198  if (nword > 0) {
199  if (m_header->nword_in - m_header->nword_out >= (nword + 1)) {
200  if (i_w > i_r) {
201  memcpy(buf, (m_buf + i_r + 1), sizeof(int) * nword);
202  break;
203  } else if (i_w < i_r) {
204  if (m_nword - i_r > nword) {
205  memcpy(buf, (m_buf + i_r + 1), sizeof(int) * nword);
206  break;
207  } else {
208  unsigned int count = m_nword - i_r;
209  memcpy(buf, (m_buf + i_r + 1), sizeof(int) * count);
210  if (nword > count) {
211  memcpy(buf + count, m_buf, sizeof(int) * (nword - count));
212  }
213  break;
214  }
215  }
216  }
217  }
218  m_header->nreader--;
219  m_cond.wait(m_mutex);
220  m_header->nreader++;
221  }
222  m_header->nword_out += nword + 1;
223  unsigned int count = ++m_header->count_out;
224  m_header->nreader--;
225  if (hdr != NULL) {
226  memcpy(hdr, m_header, sizeof(SharedEventBuffer::Header));
227  }
228  m_cond.broadcast();
229  if (!unlocked) m_mutex.unlock();
230  return count;
231 }
Abstract base class for different kinds of events.