XRootD
XrdCephBufferAlgSimple.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 //------------------------------------------------------------------------------
3 
4 #include <sys/types.h>
6 
7 #include "../XrdCephPosix.hh"
8 #include <XrdOuc/XrdOucEnv.hh>
9 #include <fcntl.h>
10 #include <sys/stat.h>
11 #include <iostream>
12 #include <thread>
13 
14 #include "XrdSfs/XrdSfsAio.hh"
15 
16 
17 using namespace XrdCephBuffer;
18 
19 
20 XrdCephBufferAlgSimple::XrdCephBufferAlgSimple(std::unique_ptr<IXrdCephBufferData> buffer,
21  std::unique_ptr<ICephIOAdapter> cephio, int fd,
22  bool useStriperlessReads):
23 m_bufferdata(std::move(buffer)), m_cephio(std::move(cephio)), m_fd(fd),
24 m_useStriperlessReads(useStriperlessReads) {
25 
26 }
27 
29  int prec = std::cout.precision();
30  float bytesBuffered = m_stats_bytes_fromceph - m_stats_bytes_bypassed;
31  float cacheUseFraction = bytesBuffered > 0 ? (1.*(m_stats_bytes_toclient-m_stats_bytes_bypassed)/bytesBuffered) : 1. ;
32 
33  BUFLOG("XrdCephBufferAlgSimple::Destructor, fd=" << m_fd
34  << ", retrieved_bytes=" << m_stats_bytes_fromceph
35  << ", bypassed_bytes=" << m_stats_bytes_bypassed
36  << ", delivered_bytes=" << m_stats_bytes_toclient
37  << std::setprecision(4)
38  << ", cache_hit_frac=" << cacheUseFraction << std::setprecision(prec));
39  m_fd = -1;
40 }
41 
42 
44  // Currently this is not supported, and callers using this should recieve the appropriate error code
45  //return -ENOSYS;
46 
47  ssize_t rc(-ENOSYS);
48  if (!aoip) {
49  return -EINVAL;
50  }
51 
52  volatile void * buf = aoip->sfsAio.aio_buf;
53  size_t blen = aoip->sfsAio.aio_nbytes;
54  off_t offset = aoip->sfsAio.aio_offset;
55 
56  // translate the aio read into a simple sync read.
57  // hopefully don't get too many out of sequence reads to effect the caching
58  rc = read(buf, offset, blen);
59 
60  aoip->Result = rc;
61  aoip->doneRead();
62 
63  return rc;
64 
65 }
66 
68  // Currently this is not supported, and callers using this should recieve the appropriate error code
69  // return -ENOSYS;
70 
71  ssize_t rc(-ENOSYS);
72  if (!aoip) {
73  return -EINVAL;
74  }
75 
76  // volatile void * buf = aoip->sfsAio.aio_buf;
77  // size_t blen = aoip->sfsAio.aio_nbytes;
78  // off_t offset = aoip->sfsAio.aio_offset;
79  size_t blen = aoip->sfsAio.aio_nbytes;
80  off_t offset = aoip->sfsAio.aio_offset;
81 
82  rc = write(const_cast<const void*>(aoip->sfsAio.aio_buf), offset, blen);
83  aoip->Result = rc;
84  aoip->doneWrite();
85  return rc;
86 
87 }
88 
89 
90 ssize_t XrdCephBufferAlgSimple::read(volatile void *buf, off_t offset, size_t blen) {
91  // Set a lock for any attempt at a simultaneous operation
92  // Use recursive, as flushCache also calls the lock and don't want to deadlock
93  // No call to flushCache should happen in a read, but be consistent
94  // BUFLOG("XrdCephBufferAlgSimple::read: preLock: " << std::hash<std::thread::id>{}(std::this_thread::get_id()) << " " << offset << " " << blen);
95  const std::lock_guard<std::recursive_mutex> lock(m_data_mutex); //
96  // BUFLOG("XrdCephBufferAlgSimple::read: postLock: " << std::hash<std::thread::id>{}(std::this_thread::get_id()) << " " << offset << " " << blen);
97 
98  // BUFLOG("XrdCephBufferAlgSimple::read status:"
99  // << "\n\tRead off/len/end: " << offset << "/" << blen << "/(" << (offset+blen) <<")"
100  // << "\n\tBuffer: start/length/end/cap: " << m_bufferStartingOffset << "/" << m_bufferLength << "/"
101  // << (m_bufferStartingOffset + m_bufferLength) << "/" << m_bufferdata->capacity()
102  // );
103  if (blen == 0) return 0;
104 
109  if (blen >= m_bufferdata->capacity()) {
110  //BUFLOG("XrdCephBufferAlgSimple::read: Readthrough cache: fd: " << m_fd
111  // << " " << offset << " " << blen);
112  // larger than cache, so read through, and invalidate the cache anyway
113  m_bufferdata->invalidate();
114  m_bufferLength =0; // ensure cached data is set to zero length
115  // #FIXME JW: const_cast is probably a bit poor.
116 
117  ssize_t rc = ceph_posix_maybestriper_pread (m_fd, const_cast<void*>(buf), blen, offset, m_useStriperlessReads);
118  if (rc > 0) {
119  m_stats_bytes_fromceph += rc;
120  m_stats_bytes_toclient += rc;
121  m_stats_bytes_bypassed += rc;
122  }
123  return rc;
124  }
125 
126  ssize_t rc(-1);
127  size_t bytesRemaining = blen; // track how many bytes still need to be read
128  off_t offsetDelta = 0;
129  size_t bytesRead = 0;
135  while (bytesRemaining > 0) {
136  // BUFLOG("In loop: " << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining << " " << m_bufferLength);
137 
138  bool loadCache = false;
139  // run some checks to see if we need to fill the cache.
140  if (m_bufferLength == 0) {
141  // no data in buffer
142  loadCache = true;
143  } else if (offset < m_bufferStartingOffset) {
144  // offset before any cache data
145  loadCache = true;
146  } else if (offset >= (off_t) (m_bufferStartingOffset + m_bufferLength) ) {
147  // offset is beyond the stored data
148  loadCache = true;
149  } else if ((offset - m_bufferStartingOffset + offsetDelta) >= (off_t)m_bufferLength) {
150  // we have now read to the end of the buffers data
151  loadCache = true;
152  }
153 
158  if (loadCache) {
159  // BUFLOG("XrdCephBufferAlgSimple::read: preLock: " << std::hash<std::thread::id>{}(std::this_thread::get_id()) << " " << "Filling the cache");
160  m_bufferdata->invalidate();
161  m_bufferLength =0; // set lengh of data stored to 0
162  rc = m_cephio->read(offset + offsetDelta, m_bufferdata->capacity()); // fill the cache
163  // BUFLOG("LoadCache ReadToCache: " << rc << " " << offset + offsetDelta << " " << m_bufferdata->capacity() );
164  if (rc < 0) {
165  BUFLOG("LoadCache Error: " << rc);
166  return rc;// TODO return correct errors
167  }
168  m_stats_bytes_fromceph += rc;
169  m_bufferStartingOffset = offset + offsetDelta;
170  m_bufferLength = rc;
171  if (rc == 0) {
172  // We should be at the end of file, with nothing more to read, and nothing that could be returned
173  // break out of the loop.
174  break;
175  }
176  }
177 
178 
179  //now read as much data as possible
180  off_t bufPosition = offset + offsetDelta - m_bufferStartingOffset;
181  rc = m_bufferdata->readBuffer( (void*) &(((char*)buf)[offsetDelta]) , bufPosition , bytesRemaining);
182  // BUFLOG("Fill result: " << offsetDelta << " " << bufPosition << " " << bytesRemaining << " " << rc)
183  if (rc < 0 ) {
184  BUFLOG("Reading from Cache Failed: " << rc << " " << offset << " "
185  << offsetDelta << " " << m_bufferStartingOffset << " "
186  << bufPosition << " "
187  << bytesRemaining );
188  return rc; // TODO return correct errors
189  }
190  if (rc == 0) {
191  // no bytes returned; much be at end of file
192  //BUFLOG("No bytes returned: " << rc << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining);
193  break; // leave the loop even though bytesremaing is probably >=0.
194  //i.e. requested a full buffers worth, but only a fraction of the file is here.
195  }
196  m_stats_bytes_toclient += rc;
197  // BUFLOG("End of loop: " << rc << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining);
198  offsetDelta += rc;
199  bytesRemaining -= rc;
200  bytesRead += rc;
201 
202  } // while bytesremaing
203 
204  return bytesRead;
205 }
206 
207 ssize_t XrdCephBufferAlgSimple::write (const void *buf, off_t offset, size_t blen) {
208  // Set a lock for any attempt at a simultaneous operation
209  // Use recursive, as flushCache also calls the lock and don't want to deadlock
210  const std::lock_guard<std::recursive_mutex> lock(m_data_mutex);
211 
212  // take the data in buf and put it into the cache; when the cache is full, write to underlying storage
213  // remember to flush the cache at the end of operations ...
214  ssize_t rc(-1);
215  ssize_t bytesWrittenToStorage(0);
216 
217  if (blen == 0) {
218  return 0; // nothing to write; are we done?
219  }
220 
227  off_t expected_offset = (off_t)(m_bufferStartingOffset + m_bufferLength);
228 
229  if ((offset != expected_offset) && (m_bufferLength > 0) ) {
230  // for the moment we just log that there is some non expected offset value
231  // TODO, might be dangerous to flush the cache on non-aligned writes ...
232  BUFLOG("Non expected offset: " << rc << " " << offset << " " << expected_offset);
233  // rc = flushWriteCache();
234  // if (rc < 0) {
235  // return rc; // TODO return correct errors
236  // }
237  } // mismatched offset
238 
241  if ( (m_bufferStartingOffset % m_bufferdata->capacity()) != 0 ) {
242  BUFLOG(" Non aligned offset?" << m_bufferStartingOffset << " "
243  << m_bufferdata->capacity() << " " << m_bufferStartingOffset % m_bufferdata->capacity() );
244  }
245 
246  // Commmented out below. It would be good to pass writes, which are larger than the buffer size,
247  // straight-through. However if the ranges are not well aligned, this could be an issue.
248  // And, what then to do about a possible partial filled buffer?
249 
250  // if (blen >= m_bufferdata->capacity()) {
251  // // TODO, might be dangerous to flush the cache on non-aligned writes ...
252  // // flush the cache now, if needed
253  // rc = flushWriteCache();
254  // if (rc < 0) {
255  // return rc; // TODO return correct errors
256  // }
257  // bytesWrittenToStorage += rc;
258 
259  // // Size is larger than the buffer; send the write straight through
260  // std::clog << "XrdCephBufferAlgSimple::write: Readthrough cache: fd: " << m_fd
261  // << " " << offset << " " << blen << std::endl;
262  // // larger than cache, so read through, and invalidate the cache anyway
263  // m_bufferdata->invalidate();
264  // m_bufferLength=0;
265  // m_bufferStartingOffset=0;
266  // rc = ceph_posix_pwrite(m_fd, buf, blen, offset);
267  // if (rc < 0) {
268  // return rc; // TODO return correct errors
269  // }
270  // bytesWrittenToStorage += rc;
271  // return rc;
272  // }
273 
278  if ((offset != expected_offset) && (m_bufferLength > 0) ) {
279  BUFLOG("Error trying to write out of order: expeted at: " << expected_offset
280  << " got offset" << offset << " of len " << blen);
281  return -EINVAL;
282  }
283  if (offset < 0) {
284  BUFLOG("Got a negative offset: " << offset);
285  return -EINVAL;
286  }
287 
288 
289  size_t bytesRemaining = blen;
290  size_t bytesWritten = 0;
291 
295  while (bytesRemaining > 0) {
299  if (m_bufferLength == m_bufferdata->capacity()) {
300  rc = flushWriteCache();
301  if (rc < 0) {
302  return rc;
303  }
304  bytesWrittenToStorage += rc;
305  } // at capacity;
306 
307  if (m_bufferLength == 0) {
308  // cache is currently empty, so set the 'reference' to the external offset now
309  m_bufferStartingOffset = offset + bytesWritten;
310  }
311  //add data to the cache from buf, from buf[offsetDelta] to the cache at position m_bufferLength
312  // make sure to write only as many bytes as left in the cache.
313  size_t nBytesToWrite = std::min(bytesRemaining, m_bufferdata->capacity()-m_bufferLength);
314  const void* bufAtOffset = (void*)((char*)buf + bytesWritten); // nasty cast as void* doesn't do arithmetic
315  if (nBytesToWrite == 0) {
316  BUFLOG( "Wanting to write 0 bytes; why is that?");
317  }
318  rc = m_bufferdata->writeBuffer(bufAtOffset, m_bufferLength, nBytesToWrite, 0);
319  if (rc < 0) {
320  BUFLOG( "WriteBuffer step failed: " << rc << " " << m_bufferLength << " " << blen << " " << offset );
321  return rc; // pass the error condidition upwards
322  }
323  if (rc != (ssize_t)nBytesToWrite) {
324  BUFLOG( "WriteBuffer returned unexpected number of bytes: " << rc << " Expected: " << nBytesToWrite << " "
325  << m_bufferLength << " " << blen << " " << offset );
326  return -EBADE; // is bad exchange error best errno here?
327  }
328 
329  // lots of repetition here; #TODO try to reduce
330  m_bufferLength += rc;
331  bytesWritten += rc;
332  bytesRemaining -= rc;
333 
334  } // while byteRemaining
335 
339  if (m_bufferLength == m_bufferdata->capacity()){
340  rc = flushWriteCache();
341  if (rc < 0)
342  {
343  return rc; // TODO return correct errors
344  }
345  bytesWrittenToStorage += rc;
346  } // at capacity;
347 
348  //BUFLOG( "WriteBuffer " << bytesWritten << " " << bytesWrittenToStorage << " " << offset << " " << blen << " " );
349  return bytesWritten;
350 }
351 
352 
353 
355  // Set a lock for any attempt at a simultaneous operation
356  // Use recursive, as write (and read) also calls the lock and don't want to deadlock
357  const std::lock_guard<std::recursive_mutex> lock(m_data_mutex); //
358  // BUFLOG("flushWriteCache: " << m_bufferStartingOffset << " " << m_bufferLength);
359  ssize_t rc(-1);
360  if (m_bufferLength == 0) {
361  BUFLOG("Empty buffer to flush: ");
362  rc = 0; // not an issue
363  }
364 
365  if (m_bufferLength > 0) {
366  rc = m_cephio->write(m_bufferStartingOffset, m_bufferLength);
367  if (rc < 0) {
368  BUFLOG("WriteBuffer write step failed: " << rc);
369  }
370  } // some bytes to write
371 
372  // reset values
373  m_bufferLength=0;
374  m_bufferStartingOffset=0;
375  m_bufferdata->invalidate();
376  // return bytes written, or errorcode if failure
377  return rc;
378 }
379 
380 
381 ssize_t XrdCephBufferAlgSimple::rawRead (void *buf, off_t offset, size_t blen) {
382  return -ENOSYS;
383 }
384 
385 ssize_t XrdCephBufferAlgSimple::rawWrite(void *buf, off_t offset, size_t blen) {
386  return -ENOSYS;
387 }
#define BUFLOG(x)
Definition: BufferUtils.hh:23
ssize_t ceph_posix_maybestriper_pread(int fd, void *buf, size_t count, off64_t offset, bool allowStriper)
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void * aio_buf
Definition: XrdSfsAio.hh:47
virtual ssize_t write_aio(XrdSfsAio *aoip) override
possible aio based code
virtual ssize_t rawWrite(void *buff, off_t offset, size_t blen)
virtual ssize_t flushWriteCache() override
remember to flush the cache on final writes
XrdCephBufferAlgSimple(std::unique_ptr< IXrdCephBufferData > buffer, std::unique_ptr< ICephIOAdapter > cephio, int fd, bool useStriperlessReads=true)
virtual ssize_t write(const void *buff, off_t offset, size_t blen) override
write data through the buffer
virtual ssize_t rawRead(void *buff, off_t offset, size_t blen)
virtual ssize_t read(volatile void *buff, off_t offset, size_t blen) override
read data through the buffer
virtual ssize_t read_aio(XrdSfsAio *aoip) override
possible aio based code
ssize_t Result
Definition: XrdSfsAio.hh:65
virtual void doneRead()=0
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
virtual void doneWrite()=0
is a simple implementation of IXrdCephBufferData using std::vector<char> representation for the buffe...
Definition: BufferUtils.hh:29