8 #include "daq/storage/SharedEventBuffer.h"
10 #include <daq/slc/system/LogFile.h>
17 size_t SharedEventBuffer::size() throw()
19 return m_mutex.size() + m_cond.size() +
20 sizeof(Header) +
sizeof(
int) * (m_nword);
23 bool SharedEventBuffer::open(
const std::string& nodename,
24 size_t nword,
bool recreate)
27 std::string username = getenv(
"USER");
28 m_path =
"/storage_info_" + username +
"_" + nodename;
29 LogFile::debug(
"%s", m_path.c_str());
31 if (!m_memory.open(m_path, size())) {
33 LogFile::fatal(
"Failed to open %s", m_path.c_str());
36 char* buf = (
char*) m_memory.map(0, size());
41 buf += m_mutex.size();
44 m_header = (Header*)buf;
45 buf +=
sizeof(Header);
51 bool SharedEventBuffer::init()
53 if (m_buf == NULL)
return false;
56 memset(m_header, 0,
sizeof(Header));
57 memset(m_buf, 0,
sizeof(
int) * m_nword);
64 void SharedEventBuffer::clear()
66 if (m_buf == NULL)
return;
68 memset(m_header, 0,
sizeof(Header));
69 for (
unsigned long long i = 0; i < m_nword; i++) {
75 bool SharedEventBuffer::close()
81 bool SharedEventBuffer::unlink()
88 bool SharedEventBuffer::lock() throw()
90 if (m_buf == NULL)
return false;
91 return m_mutex.lock();
94 bool SharedEventBuffer::unlock() throw()
96 if (m_buf == NULL)
return false;
97 return m_mutex.unlock();
100 bool SharedEventBuffer::wait() throw()
102 if (m_buf == NULL)
return false;
103 return m_cond.wait(m_mutex);
106 bool SharedEventBuffer::wait(
int time)
throw()
108 if (m_buf == NULL)
return false;
109 return m_cond.wait(m_mutex, time, 0);
112 bool SharedEventBuffer::notify() throw()
114 if (m_buf == NULL)
return false;
115 return m_cond.broadcast();
118 bool SharedEventBuffer::isWritable(
int nword)
throw()
120 if (m_buf == NULL)
return false;
122 bool writable = m_header->nword_in - m_header->nword_out < m_nword - (nword + 1);
127 bool SharedEventBuffer::isReadable(
int nword)
throw()
129 if (m_buf == NULL)
return false;
131 bool readable = m_header->nword_in - m_header->nword_out >= m_nword - (nword + 1);
137 unsigned int SharedEventBuffer::write(
const int* buf,
unsigned int nword,
138 bool fouce,
unsigned int serial,
bool unlocked)
140 if (m_buf == NULL)
return 0;
141 if (nword == 0)
return 0;
142 if (nword > m_nword)
return -1;
143 if (!unlocked) m_mutex.lock();
145 while (!fouce && m_header->nreader > 0) {
146 m_cond.wait(m_mutex);
148 unsigned int i_w = 0;
149 unsigned int i_r = 0;
151 i_w = m_header->nword_in % m_nword;
152 i_r = m_header->nword_out % m_nword;
153 if ((serial == 0 || serial - 1 == m_header->count_in) &&
154 m_header->nword_in - m_header->nword_out < m_nword - (nword + 1)) {
156 unsigned int count = m_nword - i_w;
157 if (nword + 1 < count) {
159 memcpy((m_buf + i_w + 1), buf,
sizeof(
int) * nword);
162 memcpy((m_buf + i_w + 1), buf,
sizeof(
int) * count);
164 memcpy(m_buf, buf + count,
sizeof(
int) * (nword - count));
168 memcpy((m_buf + i_w + 1), buf,
sizeof(
int) * nword);
173 m_cond.wait(m_mutex);
176 m_header->nword_in += nword + 1;
177 unsigned int count = ++m_header->count_in;
180 if (!unlocked) m_mutex.unlock();
184 unsigned int SharedEventBuffer::read(
int* buf,
bool fouce,
bool unlocked,
187 if (m_buf == NULL)
return 0;
188 if (!unlocked) m_mutex.lock();
190 while (!fouce && m_header->nwriter > 0) {
191 m_cond.wait(m_mutex);
193 unsigned int i_w = 0;
194 unsigned int i_r = 0;
195 unsigned int nword = 0;
197 i_w = m_header->nword_in % m_nword;
198 i_r = m_header->nword_out % m_nword;
201 if (m_header->nword_in - m_header->nword_out >= (nword + 1)) {
203 memcpy(buf, (m_buf + i_r + 1),
sizeof(
int) * nword);
205 }
else if (i_w < i_r) {
206 if (m_nword - i_r > nword) {
207 memcpy(buf, (m_buf + i_r + 1),
sizeof(
int) * nword);
210 unsigned int count = m_nword - i_r;
211 memcpy(buf, (m_buf + i_r + 1),
sizeof(
int) * count);
213 memcpy(buf + count, m_buf,
sizeof(
int) * (nword - count));
221 m_cond.wait(m_mutex);
224 m_header->nword_out += nword + 1;
225 unsigned int count = ++m_header->count_out;
231 if (!unlocked) m_mutex.unlock();
Abstract base class for different kinds of events.