8#include "daq/storage/SharedEventBuffer.h"
10#include <daq/slc/system/LogFile.h>
17size_t SharedEventBuffer::size() throw()
19 return m_mutex.size() + m_cond.size() +
20 sizeof(Header) +
sizeof(
int) * (m_nword);
23bool SharedEventBuffer::open(
const std::string& nodename,
24 size_t nword,
bool recreate)
27 std::string username = getenv(
"USER");
28 m_path =
"/storage_info_" + username +
"_" + nodename;
29 LogFile::debug(
"%s", m_path.c_str());
31 if (!m_memory.open(m_path, size())) {
33 LogFile::fatal(
"Failed to open %s", m_path.c_str());
36 char* buf = (
char*) m_memory.map(0, size());
41 buf += m_mutex.size();
44 m_header =
reinterpret_cast<Header*
>(buf);
45 buf +=
sizeof(Header);
51bool SharedEventBuffer::init()
53 if (m_buf == NULL)
return false;
56 memset(m_header, 0,
sizeof(Header));
57 memset(m_buf, 0,
sizeof(
int) * m_nword);
64void SharedEventBuffer::clear()
66 if (m_buf == NULL)
return;
68 memset(m_header, 0,
sizeof(Header));
69 for (
unsigned long long i = 0; i < m_nword; i++) {
75bool SharedEventBuffer::close()
81bool SharedEventBuffer::unlink()
88bool SharedEventBuffer::lock() throw()
90 if (m_buf == NULL)
return false;
91 return m_mutex.lock();
94bool SharedEventBuffer::unlock() throw()
96 if (m_buf == NULL)
return false;
97 return m_mutex.unlock();
100bool SharedEventBuffer::wait() throw()
102 if (m_buf == NULL)
return false;
103 return m_cond.wait(m_mutex);
106bool SharedEventBuffer::wait(
int time)
throw()
108 if (m_buf == NULL)
return false;
109 return m_cond.wait(m_mutex, time, 0);
112bool SharedEventBuffer::notify() throw()
114 if (m_buf == NULL)
return false;
115 return m_cond.broadcast();
118bool SharedEventBuffer::isWritable(
int nword)
throw()
120 if (m_buf == NULL)
return false;
122 bool writable = m_header->nword_in - m_header->nword_out < m_nword - (nword + 1);
127bool SharedEventBuffer::isReadable(
int nword)
throw()
129 if (m_buf == NULL)
return false;
131 bool readable = m_header->nword_in - m_header->nword_out >= m_nword - (nword + 1);
137unsigned int SharedEventBuffer::write(
const int* buf,
unsigned int nword,
138 bool fouce,
unsigned int serial,
bool unlocked)
140 if (m_buf == NULL)
return 0;
141 if (nword == 0)
return 0;
142 if (nword > m_nword)
return -1;
143 if (!unlocked) m_mutex.lock();
145 while (!fouce && m_header->nreader > 0) {
146 m_cond.wait(m_mutex);
150 unsigned int i_w = m_header->nword_in % m_nword;
151 unsigned int i_r = m_header->nword_out % m_nword;
152 if ((serial == 0 || serial - 1 == m_header->count_in) &&
153 m_header->nword_in - m_header->nword_out < m_nword - (nword + 1)) {
155 unsigned int count = m_nword - i_w;
156 if (nword + 1 < count) {
158 memcpy((m_buf + i_w + 1), buf,
sizeof(
int) * nword);
161 memcpy((m_buf + i_w + 1), buf,
sizeof(
int) * count);
163 memcpy(m_buf, buf + count,
sizeof(
int) * (nword - count));
167 memcpy((m_buf + i_w + 1), buf,
sizeof(
int) * nword);
172 m_cond.wait(m_mutex);
175 m_header->nword_in += nword + 1;
176 unsigned int count = ++m_header->count_in;
179 if (!unlocked) m_mutex.unlock();
183unsigned int SharedEventBuffer::read(
int* buf,
bool fouce,
bool unlocked,
186 if (m_buf == NULL)
return 0;
187 if (!unlocked) m_mutex.lock();
189 while (!fouce && m_header->nwriter > 0) {
190 m_cond.wait(m_mutex);
193 unsigned int nword = 0;
195 unsigned int i_w = m_header->nword_in % m_nword;
196 unsigned int i_r = m_header->nword_out % m_nword;
199 if (m_header->nword_in - m_header->nword_out >= (nword + 1)) {
201 memcpy(buf, (m_buf + i_r + 1),
sizeof(
int) * nword);
203 }
else if (i_w < i_r) {
204 if (m_nword - i_r > nword) {
205 memcpy(buf, (m_buf + i_r + 1),
sizeof(
int) * nword);
208 unsigned int count = m_nword - i_r;
209 memcpy(buf, (m_buf + i_r + 1),
sizeof(
int) * count);
211 memcpy(buf + count, m_buf,
sizeof(
int) * (nword - count));
219 m_cond.wait(m_mutex);
222 m_header->nword_out += nword + 1;
223 unsigned int count = ++m_header->count_out;
229 if (!unlocked) m_mutex.unlock();
Abstract base class for different kinds of events.