Belle II Software development
SharedEventBuffer Class Reference

Classes

struct  Header
 

Public Member Functions

size_t size () throw ()
 
bool open (const std::string &nodename, size_t nword, bool recreate=false)
 
bool init ()
 
bool close ()
 
bool unlink ()
 
bool lock () throw ()
 
bool unlock () throw ()
 
bool wait () throw ()
 
bool wait (int time) throw ()
 
bool notify () throw ()
 
void clear ()
 
bool isOpened ()
 
const std::string getPath () const throw ()
 
HeadergetHeader () throw ()
 
int * getBuffer () throw ()
 
bool isWritable (int nword) throw ()
 
bool isReadable (int nword) throw ()
 
unsigned int write (const int *buf, unsigned int nword, bool fouce, unsigned int serial=0)
 
unsigned int write (const int *buf, unsigned int nword, bool fouce, unsigned int serial, bool unlocked)
 
unsigned int read (int *buf, bool fouce, bool unlocked, Header *hdr=NULL)
 

Private Attributes

std::string m_path
 
SharedMemory m_memory
 
MMutex m_mutex
 
MCond m_cond
 
Headerm_header
 
int * m_buf
 
unsigned int m_nword
 

Detailed Description

Definition at line 21 of file SharedEventBuffer.h.

Constructor & Destructor Documentation

◆ SharedEventBuffer()

SharedEventBuffer ( )
inline

Definition at line 37 of file SharedEventBuffer.h.

38 {
39 m_buf = NULL;
40 m_nword = 0;
41 }

◆ ~SharedEventBuffer()

~SharedEventBuffer ( )
inline

Definition at line 42 of file SharedEventBuffer.h.

43 {
44 if (m_buf != NULL) m_memory.close();
45 }

Member Function Documentation

◆ clear()

void clear ( )

Definition at line 64 of file SharedEventBuffer.cc.

65{
66 if (m_buf == NULL) return;
67 m_mutex.lock();
68 memset(m_header, 0, sizeof(Header));
69 for (unsigned long long i = 0; i < m_nword; i++) {
70 m_buf[i] = 0;
71 }
72 m_mutex.unlock();
73}

◆ close()

bool close ( )

Definition at line 75 of file SharedEventBuffer.cc.

76{
77 m_memory.close();
78 return true;
79}

◆ getBuffer()

int * getBuffer ( )
throw (
)
inline

Definition at line 65 of file SharedEventBuffer.h.

65{ return m_buf; }

◆ getHeader()

Header * getHeader ( )
throw (
)
inline

Definition at line 64 of file SharedEventBuffer.h.

64{ return m_header; }

◆ getPath()

const std::string getPath ( ) const
throw (
)
inline

Definition at line 63 of file SharedEventBuffer.h.

63{ return m_path; }

◆ init()

bool init ( )

Definition at line 51 of file SharedEventBuffer.cc.

52{
53 if (m_buf == NULL) return false;
54 m_mutex.init();
55 m_cond.init();
56 memset(m_header, 0, sizeof(Header));
57 memset(m_buf, 0, sizeof(int) * m_nword);
58 //for (unsigned long long i = 0; i < m_nword; i++) {
59 // m_buf[i] = 0;
60 //}
61 return true;
62}

◆ isOpened()

bool isOpened ( )
inline

Definition at line 60 of file SharedEventBuffer.h.

60{ return m_memory.isOpened(); }

◆ isReadable()

bool isReadable ( int  nword)
throw (
)

Definition at line 127 of file SharedEventBuffer.cc.

128{
129 if (m_buf == NULL) return false;
130 m_mutex.lock();
131 bool readable = m_header->nword_in - m_header->nword_out >= m_nword - (nword + 1);
132 m_mutex.unlock();
133 return readable;
134
135}

◆ isWritable()

bool isWritable ( int  nword)
throw (
)

Definition at line 118 of file SharedEventBuffer.cc.

119{
120 if (m_buf == NULL) return false;
121 m_mutex.lock();
122 bool writable = m_header->nword_in - m_header->nword_out < m_nword - (nword + 1);
123 m_mutex.unlock();
124 return writable;
125}

◆ lock()

bool lock ( )
throw (
)

Definition at line 88 of file SharedEventBuffer.cc.

89{
90 if (m_buf == NULL) return false;
91 return m_mutex.lock();
92}

◆ notify()

bool notify ( )
throw (
)

Definition at line 112 of file SharedEventBuffer.cc.

113{
114 if (m_buf == NULL) return false;
115 return m_cond.broadcast();
116}

◆ open()

bool open ( const std::string &  nodename,
size_t  nword,
bool  recreate = false 
)

Definition at line 23 of file SharedEventBuffer.cc.

25{
26 m_nword = nword;
27 std::string username = getenv("USER");
28 m_path = "/storage_info_" + username + "_" + nodename;
29 LogFile::debug("%s", m_path.c_str());
30 //if (recreate) SharedMemory::unlink(m_path);
31 if (!m_memory.open(m_path, size())) {
32 perror("shm_open");
33 LogFile::fatal("Failed to open %s", m_path.c_str());
34 return false;
35 }
36 char* buf = (char*) m_memory.map(0, size());
37 if (buf == NULL) {
38 return false;
39 }
40 m_mutex = MMutex(buf);
41 buf += m_mutex.size();
42 m_cond = MCond(buf);
43 buf += m_cond.size();
44 m_header = reinterpret_cast<Header*>(buf);
45 buf += sizeof(Header);
46 m_buf = (int*)buf;
47 if (recreate) init();
48 return true;
49}

◆ read()

unsigned int read ( int *  buf,
bool  fouce,
bool  unlocked,
SharedEventBuffer::Header hdr = NULL 
)

Definition at line 183 of file SharedEventBuffer.cc.

185{
186 if (m_buf == NULL) return 0;
187 if (!unlocked) m_mutex.lock();
188 m_header->nreader++;
189 while (!fouce && m_header->nwriter > 0) {
190 m_cond.wait(m_mutex);
191 }
192
193 unsigned int nword = 0;
194 while (true) {
195 unsigned int i_w = m_header->nword_in % m_nword;
196 unsigned int i_r = m_header->nword_out % m_nword;
197 nword = m_buf[i_r];
198 if (nword > 0) {
199 if (m_header->nword_in - m_header->nword_out >= (nword + 1)) {
200 if (i_w > i_r) {
201 memcpy(buf, (m_buf + i_r + 1), sizeof(int) * nword);
202 break;
203 } else if (i_w < i_r) {
204 if (m_nword - i_r > nword) {
205 memcpy(buf, (m_buf + i_r + 1), sizeof(int) * nword);
206 break;
207 } else {
208 unsigned int count = m_nword - i_r;
209 memcpy(buf, (m_buf + i_r + 1), sizeof(int) * count);
210 if (nword > count) {
211 memcpy(buf + count, m_buf, sizeof(int) * (nword - count));
212 }
213 break;
214 }
215 }
216 }
217 }
218 m_header->nreader--;
219 m_cond.wait(m_mutex);
220 m_header->nreader++;
221 }
222 m_header->nword_out += nword + 1;
223 unsigned int count = ++m_header->count_out;
224 m_header->nreader--;
225 if (hdr != NULL) {
226 memcpy(hdr, m_header, sizeof(SharedEventBuffer::Header));
227 }
228 m_cond.broadcast();
229 if (!unlocked) m_mutex.unlock();
230 return count;
231}

◆ size()

size_t size ( )
throw (
)

Definition at line 17 of file SharedEventBuffer.cc.

18{
19 return m_mutex.size() + m_cond.size() +
20 sizeof(Header) + sizeof(int) * (m_nword);
21}

◆ unlink()

bool unlink ( )

Definition at line 81 of file SharedEventBuffer.cc.

82{
83 m_memory.close();
84 m_memory.unlink();
85 return true;
86}

◆ unlock()

bool unlock ( )
throw (
)

Definition at line 94 of file SharedEventBuffer.cc.

95{
96 if (m_buf == NULL) return false;
97 return m_mutex.unlock();
98}

◆ wait() [1/2]

bool wait ( )
throw (
)

Definition at line 100 of file SharedEventBuffer.cc.

101{
102 if (m_buf == NULL) return false;
103 return m_cond.wait(m_mutex);
104}

◆ wait() [2/2]

bool wait ( int  time)
throw (
)

Definition at line 106 of file SharedEventBuffer.cc.

107{
108 if (m_buf == NULL) return false;
109 return m_cond.wait(m_mutex, time, 0);
110}

◆ write() [1/2]

unsigned int write ( const int *  buf,
unsigned int  nword,
bool  fouce,
unsigned int  serial,
bool  unlocked 
)

Definition at line 137 of file SharedEventBuffer.cc.

139{
140 if (m_buf == NULL) return 0;
141 if (nword == 0) return 0;
142 if (nword > m_nword) return -1;
143 if (!unlocked) m_mutex.lock();
144 m_header->nwriter++;
145 while (!fouce && m_header->nreader > 0) {
146 m_cond.wait(m_mutex);
147 }
148
149 while (true) {
150 unsigned int i_w = m_header->nword_in % m_nword;
151 unsigned int i_r = m_header->nword_out % m_nword;
152 if ((serial == 0 || serial - 1 == m_header->count_in) &&
153 m_header->nword_in - m_header->nword_out < m_nword - (nword + 1)) {
154 if (i_w >= i_r) {
155 unsigned int count = m_nword - i_w;
156 if (nword + 1 < count) {
157 m_buf[i_w] = nword;
158 memcpy((m_buf + i_w + 1), buf, sizeof(int) * nword);
159 } else {
160 m_buf[i_w] = nword;
161 memcpy((m_buf + i_w + 1), buf, sizeof(int) * count);
162 if (nword >= count)
163 memcpy(m_buf, buf + count, sizeof(int) * (nword - count));
164 }
165 } else {
166 m_buf[i_w] = nword;
167 memcpy((m_buf + i_w + 1), buf, sizeof(int) * nword);
168 }
169 break;
170 }
171 m_header->nwriter--;
172 m_cond.wait(m_mutex);
173 m_header->nwriter++;
174 }
175 m_header->nword_in += nword + 1;
176 unsigned int count = ++m_header->count_in;
177 m_header->nwriter--;
178 m_cond.broadcast();
179 if (!unlocked) m_mutex.unlock();
180 return count;
181}

◆ write() [2/2]

unsigned int write ( const int *  buf,
unsigned int  nword,
bool  fouce,
unsigned int  serial = 0 
)
inline

Definition at line 68 of file SharedEventBuffer.h.

70 {
71 return write(buf, nword, fouce, serial, false);
72 }

Member Data Documentation

◆ m_buf

int* m_buf
private

Definition at line 83 of file SharedEventBuffer.h.

◆ m_cond

MCond m_cond
private

Definition at line 81 of file SharedEventBuffer.h.

◆ m_header

Header* m_header
private

Definition at line 82 of file SharedEventBuffer.h.

◆ m_memory

SharedMemory m_memory
private

Definition at line 79 of file SharedEventBuffer.h.

◆ m_mutex

MMutex m_mutex
private

Definition at line 80 of file SharedEventBuffer.h.

◆ m_nword

unsigned int m_nword
private

Definition at line 84 of file SharedEventBuffer.h.

◆ m_path

std::string m_path
private

Definition at line 78 of file SharedEventBuffer.h.


The documentation for this class was generated from the following files: