XRootD
XrdCephBuffer::XrdCephBufferAlgSimple Class Reference

#include <XrdCephBufferAlgSimple.hh>

+ Inheritance diagram for XrdCephBuffer::XrdCephBufferAlgSimple:
+ Collaboration diagram for XrdCephBuffer::XrdCephBufferAlgSimple:

Public Member Functions

 XrdCephBufferAlgSimple (std::unique_ptr< IXrdCephBufferData > buffer, std::unique_ptr< ICephIOAdapter > cephio, int fd, bool useStriperlessReads=true)
 
virtual ~XrdCephBufferAlgSimple ()
 
virtual IXrdCephBufferDatabuffer ()
 
virtual const IXrdCephBufferDatabuffer () const
 
virtual ssize_t flushWriteCache () override
 remember to flush the cache on final writes More...
 
virtual ssize_t read (volatile void *buff, off_t offset, size_t blen) override
 read data through the buffer More...
 
virtual ssize_t read_aio (XrdSfsAio *aoip) override
 possible aio based code More...
 
virtual ssize_t write (const void *buff, off_t offset, size_t blen) override
 write data through the buffer More...
 
virtual ssize_t write_aio (XrdSfsAio *aoip) override
 possible aio based code More...
 
- Public Member Functions inherited from XrdCephBuffer::IXrdCephBufferAlg
virtual ~IXrdCephBufferAlg ()
 

Protected Member Functions

virtual ssize_t rawRead (void *buff, off_t offset, size_t blen)
 
virtual ssize_t rawWrite (void *buff, off_t offset, size_t blen)
 

Detailed Description

Non-async buffering code for non-aio read operations. Create a single buffer of a given size. For reads, if data in the buffer read and return the available bytes; if no useful data in the buffer fill the full buffer and return the requested read. If the data is partially in the buffer for the range requested, return only that subset; client should check and make an additional call for the data not returned. if 0 bytes are returned, it should be assumed it is at the end of the file.

Definition at line 27 of file XrdCephBufferAlgSimple.hh.

Constructor & Destructor Documentation

◆ XrdCephBufferAlgSimple()

XrdCephBufferAlgSimple::XrdCephBufferAlgSimple ( std::unique_ptr< IXrdCephBufferData buffer,
std::unique_ptr< ICephIOAdapter cephio,
int  fd,
bool  useStriperlessReads = true 
)

Definition at line 20 of file XrdCephBufferAlgSimple.cc.

22  :
23 m_bufferdata(std::move(buffer)), m_cephio(std::move(cephio)), m_fd(fd),
24 m_useStriperlessReads(useStriperlessReads) {
25 
26 }
virtual const IXrdCephBufferData * buffer() const

◆ ~XrdCephBufferAlgSimple()

XrdCephBufferAlgSimple::~XrdCephBufferAlgSimple ( )
virtual

Definition at line 28 of file XrdCephBufferAlgSimple.cc.

28  {
29  int prec = std::cout.precision();
30  float bytesBuffered = m_stats_bytes_fromceph - m_stats_bytes_bypassed;
31  float cacheUseFraction = bytesBuffered > 0 ? (1.*(m_stats_bytes_toclient-m_stats_bytes_bypassed)/bytesBuffered) : 1. ;
32 
33  BUFLOG("XrdCephBufferAlgSimple::Destructor, fd=" << m_fd
34  << ", retrieved_bytes=" << m_stats_bytes_fromceph
35  << ", bypassed_bytes=" << m_stats_bytes_bypassed
36  << ", delivered_bytes=" << m_stats_bytes_toclient
37  << std::setprecision(4)
38  << ", cache_hit_frac=" << cacheUseFraction << std::setprecision(prec));
39  m_fd = -1;
40 }
#define BUFLOG(x)
Definition: BufferUtils.hh:23

References BUFLOG.

Member Function Documentation

◆ buffer() [1/2]

virtual IXrdCephBufferData* XrdCephBuffer::XrdCephBufferAlgSimple::buffer ( )
inlinevirtual

Definition at line 43 of file XrdCephBufferAlgSimple.hh.

43 {return m_bufferdata.get();}

◆ buffer() [2/2]

virtual const IXrdCephBufferData* XrdCephBuffer::XrdCephBufferAlgSimple::buffer ( ) const
inlinevirtual

Definition at line 42 of file XrdCephBufferAlgSimple.hh.

42 {return m_bufferdata.get();}

◆ flushWriteCache()

ssize_t XrdCephBufferAlgSimple::flushWriteCache ( )
overridevirtual

remember to flush the cache on final writes

Implements XrdCephBuffer::IXrdCephBufferAlg.

Definition at line 354 of file XrdCephBufferAlgSimple.cc.

354  {
355  // Set a lock for any attempt at a simultaneous operation
356  // Use recursive, as write (and read) also calls the lock and don't want to deadlock
357  const std::lock_guard<std::recursive_mutex> lock(m_data_mutex); //
358  // BUFLOG("flushWriteCache: " << m_bufferStartingOffset << " " << m_bufferLength);
359  ssize_t rc(-1);
360  if (m_bufferLength == 0) {
361  BUFLOG("Empty buffer to flush: ");
362  rc = 0; // not an issue
363  }
364 
365  if (m_bufferLength > 0) {
366  rc = m_cephio->write(m_bufferStartingOffset, m_bufferLength);
367  if (rc < 0) {
368  BUFLOG("WriteBuffer write step failed: " << rc);
369  }
370  } // some bytes to write
371 
372  // reset values
373  m_bufferLength=0;
374  m_bufferStartingOffset=0;
375  m_bufferdata->invalidate();
376  // return bytes written, or errorcode if failure
377  return rc;
378 }

References BUFLOG.

Referenced by write().

+ Here is the caller graph for this function:

◆ rawRead()

ssize_t XrdCephBufferAlgSimple::rawRead ( void *  buff,
off_t  offset,
size_t  blen 
)
protectedvirtual

Definition at line 381 of file XrdCephBufferAlgSimple.cc.

381  {
382  return -ENOSYS;
383 }

◆ rawWrite()

ssize_t XrdCephBufferAlgSimple::rawWrite ( void *  buff,
off_t  offset,
size_t  blen 
)
protectedvirtual

Definition at line 385 of file XrdCephBufferAlgSimple.cc.

385  {
386  return -ENOSYS;
387 }

◆ read()

ssize_t XrdCephBufferAlgSimple::read ( volatile void *  buff,
off_t  offset,
size_t  blen 
)
overridevirtual

read data through the buffer

If the requested read is larger than the buffer size, just bypass the cache. Invalidate the cache in anycase

In principle, only should ever have the first loop, however, in the case a read request passes over the boundary of the buffer, two reads will be needed; the first to read out the current buffer, and a second, to read the partial data from the refilled buffer

If we need to load data in the cache, do it here.

Implements XrdCephBuffer::IXrdCephBufferAlg.

Definition at line 90 of file XrdCephBufferAlgSimple.cc.

90  {
91  // Set a lock for any attempt at a simultaneous operation
92  // Use recursive, as flushCache also calls the lock and don't want to deadlock
93  // No call to flushCache should happen in a read, but be consistent
94  // BUFLOG("XrdCephBufferAlgSimple::read: preLock: " << std::hash<std::thread::id>{}(std::this_thread::get_id()) << " " << offset << " " << blen);
95  const std::lock_guard<std::recursive_mutex> lock(m_data_mutex); //
96  // BUFLOG("XrdCephBufferAlgSimple::read: postLock: " << std::hash<std::thread::id>{}(std::this_thread::get_id()) << " " << offset << " " << blen);
97 
98  // BUFLOG("XrdCephBufferAlgSimple::read status:"
99  // << "\n\tRead off/len/end: " << offset << "/" << blen << "/(" << (offset+blen) <<")"
100  // << "\n\tBuffer: start/length/end/cap: " << m_bufferStartingOffset << "/" << m_bufferLength << "/"
101  // << (m_bufferStartingOffset + m_bufferLength) << "/" << m_bufferdata->capacity()
102  // );
103  if (blen == 0) return 0;
104 
109  if (blen >= m_bufferdata->capacity()) {
110  //BUFLOG("XrdCephBufferAlgSimple::read: Readthrough cache: fd: " << m_fd
111  // << " " << offset << " " << blen);
112  // larger than cache, so read through, and invalidate the cache anyway
113  m_bufferdata->invalidate();
114  m_bufferLength =0; // ensure cached data is set to zero length
115  // #FIXME JW: const_cast is probably a bit poor.
116 
117  ssize_t rc = ceph_posix_maybestriper_pread (m_fd, const_cast<void*>(buf), blen, offset, m_useStriperlessReads);
118  if (rc > 0) {
119  m_stats_bytes_fromceph += rc;
120  m_stats_bytes_toclient += rc;
121  m_stats_bytes_bypassed += rc;
122  }
123  return rc;
124  }
125 
126  ssize_t rc(-1);
127  size_t bytesRemaining = blen; // track how many bytes still need to be read
128  off_t offsetDelta = 0;
129  size_t bytesRead = 0;
135  while (bytesRemaining > 0) {
136  // BUFLOG("In loop: " << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining << " " << m_bufferLength);
137 
138  bool loadCache = false;
139  // run some checks to see if we need to fill the cache.
140  if (m_bufferLength == 0) {
141  // no data in buffer
142  loadCache = true;
143  } else if (offset < m_bufferStartingOffset) {
144  // offset before any cache data
145  loadCache = true;
146  } else if (offset >= (off_t) (m_bufferStartingOffset + m_bufferLength) ) {
147  // offset is beyond the stored data
148  loadCache = true;
149  } else if ((offset - m_bufferStartingOffset + offsetDelta) >= (off_t)m_bufferLength) {
150  // we have now read to the end of the buffers data
151  loadCache = true;
152  }
153 
158  if (loadCache) {
159  // BUFLOG("XrdCephBufferAlgSimple::read: preLock: " << std::hash<std::thread::id>{}(std::this_thread::get_id()) << " " << "Filling the cache");
160  m_bufferdata->invalidate();
161  m_bufferLength =0; // set lengh of data stored to 0
162  rc = m_cephio->read(offset + offsetDelta, m_bufferdata->capacity()); // fill the cache
163  // BUFLOG("LoadCache ReadToCache: " << rc << " " << offset + offsetDelta << " " << m_bufferdata->capacity() );
164  if (rc < 0) {
165  BUFLOG("LoadCache Error: " << rc);
166  return rc;// TODO return correct errors
167  }
168  m_stats_bytes_fromceph += rc;
169  m_bufferStartingOffset = offset + offsetDelta;
170  m_bufferLength = rc;
171  if (rc == 0) {
172  // We should be at the end of file, with nothing more to read, and nothing that could be returned
173  // break out of the loop.
174  break;
175  }
176  }
177 
178 
179  //now read as much data as possible
180  off_t bufPosition = offset + offsetDelta - m_bufferStartingOffset;
181  rc = m_bufferdata->readBuffer( (void*) &(((char*)buf)[offsetDelta]) , bufPosition , bytesRemaining);
182  // BUFLOG("Fill result: " << offsetDelta << " " << bufPosition << " " << bytesRemaining << " " << rc)
183  if (rc < 0 ) {
184  BUFLOG("Reading from Cache Failed: " << rc << " " << offset << " "
185  << offsetDelta << " " << m_bufferStartingOffset << " "
186  << bufPosition << " "
187  << bytesRemaining );
188  return rc; // TODO return correct errors
189  }
190  if (rc == 0) {
191  // no bytes returned; much be at end of file
192  //BUFLOG("No bytes returned: " << rc << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining);
193  break; // leave the loop even though bytesremaing is probably >=0.
194  //i.e. requested a full buffers worth, but only a fraction of the file is here.
195  }
196  m_stats_bytes_toclient += rc;
197  // BUFLOG("End of loop: " << rc << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining);
198  offsetDelta += rc;
199  bytesRemaining -= rc;
200  bytesRead += rc;
201 
202  } // while bytesremaing
203 
204  return bytesRead;
205 }
ssize_t ceph_posix_maybestriper_pread(int fd, void *buf, size_t count, off64_t offset, bool allowStriper)

References BUFLOG, and ceph_posix_maybestriper_pread().

Referenced by read_aio().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ read_aio()

ssize_t XrdCephBufferAlgSimple::read_aio ( XrdSfsAio aoip)
overridevirtual

possible aio based code

Implements XrdCephBuffer::IXrdCephBufferAlg.

Definition at line 43 of file XrdCephBufferAlgSimple.cc.

43  {
44  // Currently this is not supported, and callers using this should recieve the appropriate error code
45  //return -ENOSYS;
46 
47  ssize_t rc(-ENOSYS);
48  if (!aoip) {
49  return -EINVAL;
50  }
51 
52  volatile void * buf = aoip->sfsAio.aio_buf;
53  size_t blen = aoip->sfsAio.aio_nbytes;
54  off_t offset = aoip->sfsAio.aio_offset;
55 
56  // translate the aio read into a simple sync read.
57  // hopefully don't get too many out of sequence reads to effect the caching
58  rc = read(buf, offset, blen);
59 
60  aoip->Result = rc;
61  aoip->doneRead();
62 
63  return rc;
64 
65 }
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void * aio_buf
Definition: XrdSfsAio.hh:47
virtual ssize_t read(volatile void *buff, off_t offset, size_t blen) override
read data through the buffer
ssize_t Result
Definition: XrdSfsAio.hh:65
virtual void doneRead()=0
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62

References aiocb::aio_buf, aiocb::aio_nbytes, aiocb::aio_offset, XrdSfsAio::doneRead(), read(), XrdSfsAio::Result, and XrdSfsAio::sfsAio.

+ Here is the call graph for this function:

◆ write()

ssize_t XrdCephBufferAlgSimple::write ( const void *  buff,
off_t  offset,
size_t  blen 
)
overridevirtual

write data through the buffer

We expect the next write to be in order and well defined. Determine the expected offset, and compare against offset provided Expected offset is the end of the buffer. m_bufferStartingOffset is the represented offset in ceph that buffer[0] represents

We should be equally careful if the offset of the buffer start is not aligned sensibly. Log this only for now, but #TODO, this should be come an error condition for over cautitious behaviour.

Provide some sanity checking for the write to the buffer. We call an error on this conditions as there is no immediate solution that is satisfactory.

< track how many bytes left to write

Typically would expect only one loop, i.e. the write request is smaller than the buffer. If bigger, or the request stradles the end of the buffer, will need another loop

If the cache is already full, lets flush to disk now

Check again if we can write data into the storage

Implements XrdCephBuffer::IXrdCephBufferAlg.

Definition at line 207 of file XrdCephBufferAlgSimple.cc.

207  {
208  // Set a lock for any attempt at a simultaneous operation
209  // Use recursive, as flushCache also calls the lock and don't want to deadlock
210  const std::lock_guard<std::recursive_mutex> lock(m_data_mutex);
211 
212  // take the data in buf and put it into the cache; when the cache is full, write to underlying storage
213  // remember to flush the cache at the end of operations ...
214  ssize_t rc(-1);
215  ssize_t bytesWrittenToStorage(0);
216 
217  if (blen == 0) {
218  return 0; // nothing to write; are we done?
219  }
220 
227  off_t expected_offset = (off_t)(m_bufferStartingOffset + m_bufferLength);
228 
229  if ((offset != expected_offset) && (m_bufferLength > 0) ) {
230  // for the moment we just log that there is some non expected offset value
231  // TODO, might be dangerous to flush the cache on non-aligned writes ...
232  BUFLOG("Non expected offset: " << rc << " " << offset << " " << expected_offset);
233  // rc = flushWriteCache();
234  // if (rc < 0) {
235  // return rc; // TODO return correct errors
236  // }
237  } // mismatched offset
238 
241  if ( (m_bufferStartingOffset % m_bufferdata->capacity()) != 0 ) {
242  BUFLOG(" Non aligned offset?" << m_bufferStartingOffset << " "
243  << m_bufferdata->capacity() << " " << m_bufferStartingOffset % m_bufferdata->capacity() );
244  }
245 
246  // Commmented out below. It would be good to pass writes, which are larger than the buffer size,
247  // straight-through. However if the ranges are not well aligned, this could be an issue.
248  // And, what then to do about a possible partial filled buffer?
249 
250  // if (blen >= m_bufferdata->capacity()) {
251  // // TODO, might be dangerous to flush the cache on non-aligned writes ...
252  // // flush the cache now, if needed
253  // rc = flushWriteCache();
254  // if (rc < 0) {
255  // return rc; // TODO return correct errors
256  // }
257  // bytesWrittenToStorage += rc;
258 
259  // // Size is larger than the buffer; send the write straight through
260  // std::clog << "XrdCephBufferAlgSimple::write: Readthrough cache: fd: " << m_fd
261  // << " " << offset << " " << blen << std::endl;
262  // // larger than cache, so read through, and invalidate the cache anyway
263  // m_bufferdata->invalidate();
264  // m_bufferLength=0;
265  // m_bufferStartingOffset=0;
266  // rc = ceph_posix_pwrite(m_fd, buf, blen, offset);
267  // if (rc < 0) {
268  // return rc; // TODO return correct errors
269  // }
270  // bytesWrittenToStorage += rc;
271  // return rc;
272  // }
273 
278  if ((offset != expected_offset) && (m_bufferLength > 0) ) {
279  BUFLOG("Error trying to write out of order: expeted at: " << expected_offset
280  << " got offset" << offset << " of len " << blen);
281  return -EINVAL;
282  }
283  if (offset < 0) {
284  BUFLOG("Got a negative offset: " << offset);
285  return -EINVAL;
286  }
287 
288 
289  size_t bytesRemaining = blen;
290  size_t bytesWritten = 0;
291 
295  while (bytesRemaining > 0) {
299  if (m_bufferLength == m_bufferdata->capacity()) {
300  rc = flushWriteCache();
301  if (rc < 0) {
302  return rc;
303  }
304  bytesWrittenToStorage += rc;
305  } // at capacity;
306 
307  if (m_bufferLength == 0) {
308  // cache is currently empty, so set the 'reference' to the external offset now
309  m_bufferStartingOffset = offset + bytesWritten;
310  }
311  //add data to the cache from buf, from buf[offsetDelta] to the cache at position m_bufferLength
312  // make sure to write only as many bytes as left in the cache.
313  size_t nBytesToWrite = std::min(bytesRemaining, m_bufferdata->capacity()-m_bufferLength);
314  const void* bufAtOffset = (void*)((char*)buf + bytesWritten); // nasty cast as void* doesn't do arithmetic
315  if (nBytesToWrite == 0) {
316  BUFLOG( "Wanting to write 0 bytes; why is that?");
317  }
318  rc = m_bufferdata->writeBuffer(bufAtOffset, m_bufferLength, nBytesToWrite, 0);
319  if (rc < 0) {
320  BUFLOG( "WriteBuffer step failed: " << rc << " " << m_bufferLength << " " << blen << " " << offset );
321  return rc; // pass the error condidition upwards
322  }
323  if (rc != (ssize_t)nBytesToWrite) {
324  BUFLOG( "WriteBuffer returned unexpected number of bytes: " << rc << " Expected: " << nBytesToWrite << " "
325  << m_bufferLength << " " << blen << " " << offset );
326  return -EBADE; // is bad exchange error best errno here?
327  }
328 
329  // lots of repetition here; #TODO try to reduce
330  m_bufferLength += rc;
331  bytesWritten += rc;
332  bytesRemaining -= rc;
333 
334  } // while byteRemaining
335 
339  if (m_bufferLength == m_bufferdata->capacity()){
340  rc = flushWriteCache();
341  if (rc < 0)
342  {
343  return rc; // TODO return correct errors
344  }
345  bytesWrittenToStorage += rc;
346  } // at capacity;
347 
348  //BUFLOG( "WriteBuffer " << bytesWritten << " " << bytesWrittenToStorage << " " << offset << " " << blen << " " );
349  return bytesWritten;
350 }
virtual ssize_t flushWriteCache() override
remember to flush the cache on final writes

References BUFLOG, and flushWriteCache().

Referenced by write_aio().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ write_aio()

ssize_t XrdCephBufferAlgSimple::write_aio ( XrdSfsAio aoip)
overridevirtual

possible aio based code

Implements XrdCephBuffer::IXrdCephBufferAlg.

Definition at line 67 of file XrdCephBufferAlgSimple.cc.

67  {
68  // Currently this is not supported, and callers using this should recieve the appropriate error code
69  // return -ENOSYS;
70 
71  ssize_t rc(-ENOSYS);
72  if (!aoip) {
73  return -EINVAL;
74  }
75 
76  // volatile void * buf = aoip->sfsAio.aio_buf;
77  // size_t blen = aoip->sfsAio.aio_nbytes;
78  // off_t offset = aoip->sfsAio.aio_offset;
79  size_t blen = aoip->sfsAio.aio_nbytes;
80  off_t offset = aoip->sfsAio.aio_offset;
81 
82  rc = write(const_cast<const void*>(aoip->sfsAio.aio_buf), offset, blen);
83  aoip->Result = rc;
84  aoip->doneWrite();
85  return rc;
86 
87 }
virtual ssize_t write(const void *buff, off_t offset, size_t blen) override
write data through the buffer
virtual void doneWrite()=0

References aiocb::aio_buf, aiocb::aio_nbytes, aiocb::aio_offset, XrdSfsAio::doneWrite(), XrdSfsAio::Result, XrdSfsAio::sfsAio, and write().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: