1 #include "daq/storage/SharedEventBuffer.h"
3 #include <daq/slc/system/LogFile.h>
10 size_t SharedEventBuffer::size() throw()
12 return m_mutex.size() + m_cond.size() +
13 sizeof(Header) +
sizeof(
int) * (m_nword);
16 bool SharedEventBuffer::open(
const std::string& nodename,
17 size_t nword,
bool recreate)
20 std::string username = getenv(
"USER");
21 m_path =
"/storage_info_" + username +
"_" + nodename;
22 LogFile::debug(
"%s", m_path.c_str());
24 if (!m_memory.open(m_path, size())) {
26 LogFile::fatal(
"Failed to open %s", m_path.c_str());
29 char* buf = (
char*) m_memory.map(0, size());
34 buf += m_mutex.size();
37 m_header = (Header*)buf;
38 buf +=
sizeof(Header);
44 bool SharedEventBuffer::init()
46 if (m_buf == NULL)
return false;
49 memset(m_header, 0,
sizeof(Header));
50 memset(m_buf, 0,
sizeof(
int) * m_nword);
57 void SharedEventBuffer::clear()
59 if (m_buf == NULL)
return;
61 memset(m_header, 0,
sizeof(Header));
62 for (
unsigned long long i = 0; i < m_nword; i++) {
68 bool SharedEventBuffer::close()
74 bool SharedEventBuffer::unlink()
81 bool SharedEventBuffer::lock() throw()
83 if (m_buf == NULL)
return false;
84 return m_mutex.lock();
87 bool SharedEventBuffer::unlock() throw()
89 if (m_buf == NULL)
return false;
90 return m_mutex.unlock();
93 bool SharedEventBuffer::wait() throw()
95 if (m_buf == NULL)
return false;
96 return m_cond.wait(m_mutex);
99 bool SharedEventBuffer::wait(
int time)
throw()
101 if (m_buf == NULL)
return false;
102 return m_cond.wait(m_mutex, time, 0);
105 bool SharedEventBuffer::notify() throw()
107 if (m_buf == NULL)
return false;
108 return m_cond.broadcast();
111 bool SharedEventBuffer::isWritable(
int nword)
throw()
113 if (m_buf == NULL)
return false;
115 bool writable = m_header->nword_in - m_header->nword_out < m_nword - (nword + 1);
120 bool SharedEventBuffer::isReadable(
int nword)
throw()
122 if (m_buf == NULL)
return false;
124 bool readable = m_header->nword_in - m_header->nword_out >= m_nword - (nword + 1);
130 unsigned int SharedEventBuffer::write(
const int* buf,
unsigned int nword,
131 bool fouce,
unsigned int serial,
bool unlocked)
133 if (m_buf == NULL)
return 0;
134 if (nword == 0)
return 0;
135 if (nword > m_nword)
return -1;
136 if (!unlocked) m_mutex.lock();
138 while (!fouce && m_header->nreader > 0) {
139 m_cond.wait(m_mutex);
141 unsigned int i_w = 0;
142 unsigned int i_r = 0;
144 i_w = m_header->nword_in % m_nword;
145 i_r = m_header->nword_out % m_nword;
146 if ((serial == 0 || serial - 1 == m_header->count_in) &&
147 m_header->nword_in - m_header->nword_out < m_nword - (nword + 1)) {
149 unsigned int count = m_nword - i_w;
150 if (nword + 1 < count) {
152 memcpy((m_buf + i_w + 1), buf,
sizeof(
int) * nword);
155 memcpy((m_buf + i_w + 1), buf,
sizeof(
int) * count);
157 memcpy(m_buf, buf + count,
sizeof(
int) * (nword - count));
161 memcpy((m_buf + i_w + 1), buf,
sizeof(
int) * nword);
166 m_cond.wait(m_mutex);
169 m_header->nword_in += nword + 1;
170 unsigned int count = ++m_header->count_in;
173 if (!unlocked) m_mutex.unlock();
177 unsigned int SharedEventBuffer::read(
int* buf,
bool fouce,
bool unlocked,
180 if (m_buf == NULL)
return 0;
181 if (!unlocked) m_mutex.lock();
183 while (!fouce && m_header->nwriter > 0) {
184 m_cond.wait(m_mutex);
186 unsigned int i_w = 0;
187 unsigned int i_r = 0;
188 unsigned int nword = 0;
190 i_w = m_header->nword_in % m_nword;
191 i_r = m_header->nword_out % m_nword;
194 if (m_header->nword_in - m_header->nword_out >= (nword + 1)) {
196 memcpy(buf, (m_buf + i_r + 1),
sizeof(
int) * nword);
198 }
else if (i_w < i_r) {
199 if (m_nword - i_r > nword) {
200 memcpy(buf, (m_buf + i_r + 1),
sizeof(
int) * nword);
203 unsigned int count = m_nword - i_r;
204 memcpy(buf, (m_buf + i_r + 1),
sizeof(
int) * count);
206 memcpy(buf + count, m_buf,
sizeof(
int) * (nword - count));
214 m_cond.wait(m_mutex);
217 m_header->nword_out += nword + 1;
218 unsigned int count = ++m_header->count_out;
224 if (!unlocked) m_mutex.unlock();