Belle II Software development
SharedEventBuffer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include "daq/storage/SharedEventBuffer.h"
9
10#include <daq/slc/system/LogFile.h>
11
12#include <cstring>
13#include <cstdio>
14
15using namespace Belle2;
16
17size_t SharedEventBuffer::size() throw()
18{
19 return m_mutex.size() + m_cond.size() +
20 sizeof(Header) + sizeof(int) * (m_nword);
21}
22
23bool SharedEventBuffer::open(const std::string& nodename,
24 size_t nword, bool recreate)
25{
26 m_nword = nword;
27 std::string username = getenv("USER");
28 m_path = "/storage_info_" + username + "_" + nodename;
29 LogFile::debug("%s", m_path.c_str());
30 //if (recreate) SharedMemory::unlink(m_path);
31 if (!m_memory.open(m_path, size())) {
32 perror("shm_open");
33 LogFile::fatal("Failed to open %s", m_path.c_str());
34 return false;
35 }
36 char* buf = (char*) m_memory.map(0, size());
37 if (buf == NULL) {
38 return false;
39 }
40 m_mutex = MMutex(buf);
41 buf += m_mutex.size();
42 m_cond = MCond(buf);
43 buf += m_cond.size();
44 m_header = reinterpret_cast<Header*>(buf);
45 buf += sizeof(Header);
46 m_buf = (int*)buf;
47 if (recreate) init();
48 return true;
49}
50
51bool SharedEventBuffer::init()
52{
53 if (m_buf == NULL) return false;
54 m_mutex.init();
55 m_cond.init();
56 memset(m_header, 0, sizeof(Header));
57 memset(m_buf, 0, sizeof(int) * m_nword);
58 //for (unsigned long long i = 0; i < m_nword; i++) {
59 // m_buf[i] = 0;
60 //}
61 return true;
62}
63
64void SharedEventBuffer::clear()
65{
66 if (m_buf == NULL) return;
67 m_mutex.lock();
68 memset(m_header, 0, sizeof(Header));
69 for (unsigned long long i = 0; i < m_nword; i++) {
70 m_buf[i] = 0;
71 }
72 m_mutex.unlock();
73}
74
75bool SharedEventBuffer::close()
76{
77 m_memory.close();
78 return true;
79}
80
81bool SharedEventBuffer::unlink()
82{
83 m_memory.close();
84 m_memory.unlink();
85 return true;
86}
87
88bool SharedEventBuffer::lock() throw()
89{
90 if (m_buf == NULL) return false;
91 return m_mutex.lock();
92}
93
94bool SharedEventBuffer::unlock() throw()
95{
96 if (m_buf == NULL) return false;
97 return m_mutex.unlock();
98}
99
100bool SharedEventBuffer::wait() throw()
101{
102 if (m_buf == NULL) return false;
103 return m_cond.wait(m_mutex);
104}
105
106bool SharedEventBuffer::wait(int time) throw()
107{
108 if (m_buf == NULL) return false;
109 return m_cond.wait(m_mutex, time, 0);
110}
111
112bool SharedEventBuffer::notify() throw()
113{
114 if (m_buf == NULL) return false;
115 return m_cond.broadcast();
116}
117
118bool SharedEventBuffer::isWritable(int nword) throw()
119{
120 if (m_buf == NULL) return false;
121 m_mutex.lock();
122 bool writable = m_header->nword_in - m_header->nword_out < m_nword - (nword + 1);
123 m_mutex.unlock();
124 return writable;
125}
126
127bool SharedEventBuffer::isReadable(int nword) throw()
128{
129 if (m_buf == NULL) return false;
130 m_mutex.lock();
131 bool readable = m_header->nword_in - m_header->nword_out >= m_nword - (nword + 1);
132 m_mutex.unlock();
133 return readable;
134
135}
136
137unsigned int SharedEventBuffer::write(const int* buf, unsigned int nword,
138 bool fouce, unsigned int serial, bool unlocked)
139{
140 if (m_buf == NULL) return 0;
141 if (nword == 0) return 0;
142 if (nword > m_nword) return -1;
143 if (!unlocked) m_mutex.lock();
144 m_header->nwriter++;
145 while (!fouce && m_header->nreader > 0) {
146 m_cond.wait(m_mutex);
147 }
148
149 while (true) {
150 unsigned int i_w = m_header->nword_in % m_nword;
151 unsigned int i_r = m_header->nword_out % m_nword;
152 if ((serial == 0 || serial - 1 == m_header->count_in) &&
153 m_header->nword_in - m_header->nword_out < m_nword - (nword + 1)) {
154 if (i_w >= i_r) {
155 unsigned int count = m_nword - i_w;
156 if (nword + 1 < count) {
157 m_buf[i_w] = nword;
158 memcpy((m_buf + i_w + 1), buf, sizeof(int) * nword);
159 } else {
160 m_buf[i_w] = nword;
161 memcpy((m_buf + i_w + 1), buf, sizeof(int) * count);
162 if (nword >= count)
163 memcpy(m_buf, buf + count, sizeof(int) * (nword - count));
164 }
165 } else {
166 m_buf[i_w] = nword;
167 memcpy((m_buf + i_w + 1), buf, sizeof(int) * nword);
168 }
169 break;
170 }
171 m_header->nwriter--;
172 m_cond.wait(m_mutex);
173 m_header->nwriter++;
174 }
175 m_header->nword_in += nword + 1;
176 unsigned int count = ++m_header->count_in;
177 m_header->nwriter--;
178 m_cond.broadcast();
179 if (!unlocked) m_mutex.unlock();
180 return count;
181}
182
183unsigned int SharedEventBuffer::read(int* buf, bool fouce, bool unlocked,
185{
186 if (m_buf == NULL) return 0;
187 if (!unlocked) m_mutex.lock();
188 m_header->nreader++;
189 while (!fouce && m_header->nwriter > 0) {
190 m_cond.wait(m_mutex);
191 }
192
193 unsigned int nword = 0;
194 while (true) {
195 unsigned int i_w = m_header->nword_in % m_nword;
196 unsigned int i_r = m_header->nword_out % m_nword;
197 nword = m_buf[i_r];
198 if (nword > 0) {
199 if (m_header->nword_in - m_header->nword_out >= (nword + 1)) {
200 if (i_w > i_r) {
201 memcpy(buf, (m_buf + i_r + 1), sizeof(int) * nword);
202 break;
203 } else if (i_w < i_r) {
204 if (m_nword - i_r > nword) {
205 memcpy(buf, (m_buf + i_r + 1), sizeof(int) * nword);
206 break;
207 } else {
208 unsigned int count = m_nword - i_r;
209 memcpy(buf, (m_buf + i_r + 1), sizeof(int) * count);
210 if (nword > count) {
211 memcpy(buf + count, m_buf, sizeof(int) * (nword - count));
212 }
213 break;
214 }
215 }
216 }
217 }
218 m_header->nreader--;
219 m_cond.wait(m_mutex);
220 m_header->nreader++;
221 }
222 m_header->nword_out += nword + 1;
223 unsigned int count = ++m_header->count_out;
224 m_header->nreader--;
225 if (hdr != NULL) {
226 memcpy(hdr, m_header, sizeof(SharedEventBuffer::Header));
227 }
228 m_cond.broadcast();
229 if (!unlocked) m_mutex.unlock();
230 return count;
231}
Abstract base class for different kinds of events.