7 #include "../XrdCephPosix.hh"
21 std::unique_ptr<ICephIOAdapter> cephio,
int fd,
22 bool useStriperlessReads):
23 m_bufferdata(std::move(buffer)), m_cephio(std::move(cephio)), m_fd(fd),
24 m_useStriperlessReads(useStriperlessReads) {
29 int prec = std::cout.precision();
30 float bytesBuffered = m_stats_bytes_fromceph - m_stats_bytes_bypassed;
31 float cacheUseFraction = bytesBuffered > 0 ? (1.*(m_stats_bytes_toclient-m_stats_bytes_bypassed)/bytesBuffered) : 1. ;
33 BUFLOG(
"XrdCephBufferAlgSimple::Destructor, fd=" << m_fd
34 <<
", retrieved_bytes=" << m_stats_bytes_fromceph
35 <<
", bypassed_bytes=" << m_stats_bytes_bypassed
36 <<
", delivered_bytes=" << m_stats_bytes_toclient
37 << std::setprecision(4)
38 <<
", cache_hit_frac=" << cacheUseFraction << std::setprecision(prec));
58 rc =
read(buf, offset, blen);
95 const std::lock_guard<std::recursive_mutex> lock(m_data_mutex);
103 if (blen == 0)
return 0;
109 if (blen >= m_bufferdata->capacity()) {
113 m_bufferdata->invalidate();
119 m_stats_bytes_fromceph += rc;
120 m_stats_bytes_toclient += rc;
121 m_stats_bytes_bypassed += rc;
127 size_t bytesRemaining = blen;
128 off_t offsetDelta = 0;
129 size_t bytesRead = 0;
135 while (bytesRemaining > 0) {
138 bool loadCache =
false;
140 if (m_bufferLength == 0) {
143 }
else if (offset < m_bufferStartingOffset) {
146 }
else if (offset >= (off_t) (m_bufferStartingOffset + m_bufferLength) ) {
149 }
else if ((offset - m_bufferStartingOffset + offsetDelta) >= (off_t)m_bufferLength) {
160 m_bufferdata->invalidate();
162 rc = m_cephio->read(offset + offsetDelta, m_bufferdata->capacity());
165 BUFLOG(
"LoadCache Error: " << rc);
168 m_stats_bytes_fromceph += rc;
169 m_bufferStartingOffset = offset + offsetDelta;
180 off_t bufPosition = offset + offsetDelta - m_bufferStartingOffset;
181 rc = m_bufferdata->readBuffer( (
void*) &(((
char*)buf)[offsetDelta]) , bufPosition , bytesRemaining);
184 BUFLOG(
"Reading from Cache Failed: " << rc <<
" " << offset <<
" "
185 << offsetDelta <<
" " << m_bufferStartingOffset <<
" "
186 << bufPosition <<
" "
196 m_stats_bytes_toclient += rc;
199 bytesRemaining -= rc;
210 const std::lock_guard<std::recursive_mutex> lock(m_data_mutex);
215 ssize_t bytesWrittenToStorage(0);
227 off_t expected_offset = (off_t)(m_bufferStartingOffset + m_bufferLength);
229 if ((offset != expected_offset) && (m_bufferLength > 0) ) {
232 BUFLOG(
"Non expected offset: " << rc <<
" " << offset <<
" " << expected_offset);
241 if ( (m_bufferStartingOffset % m_bufferdata->capacity()) != 0 ) {
242 BUFLOG(
" Non aligned offset?" << m_bufferStartingOffset <<
" "
243 << m_bufferdata->capacity() <<
" " << m_bufferStartingOffset % m_bufferdata->capacity() );
278 if ((offset != expected_offset) && (m_bufferLength > 0) ) {
279 BUFLOG(
"Error trying to write out of order: expeted at: " << expected_offset
280 <<
" got offset" << offset <<
" of len " << blen);
284 BUFLOG(
"Got a negative offset: " << offset);
289 size_t bytesRemaining = blen;
290 size_t bytesWritten = 0;
295 while (bytesRemaining > 0) {
299 if (m_bufferLength == m_bufferdata->capacity()) {
304 bytesWrittenToStorage += rc;
307 if (m_bufferLength == 0) {
309 m_bufferStartingOffset = offset + bytesWritten;
313 size_t nBytesToWrite = std::min(bytesRemaining, m_bufferdata->capacity()-m_bufferLength);
314 const void* bufAtOffset = (
void*)((
char*)buf + bytesWritten);
315 if (nBytesToWrite == 0) {
316 BUFLOG(
"Wanting to write 0 bytes; why is that?");
318 rc = m_bufferdata->writeBuffer(bufAtOffset, m_bufferLength, nBytesToWrite, 0);
320 BUFLOG(
"WriteBuffer step failed: " << rc <<
" " << m_bufferLength <<
" " << blen <<
" " << offset );
323 if (rc != (ssize_t)nBytesToWrite) {
324 BUFLOG(
"WriteBuffer returned unexpected number of bytes: " << rc <<
" Expected: " << nBytesToWrite <<
" "
325 << m_bufferLength <<
" " << blen <<
" " << offset );
330 m_bufferLength += rc;
332 bytesRemaining -= rc;
339 if (m_bufferLength == m_bufferdata->capacity()){
345 bytesWrittenToStorage += rc;
357 const std::lock_guard<std::recursive_mutex> lock(m_data_mutex);
360 if (m_bufferLength == 0) {
361 BUFLOG(
"Empty buffer to flush: ");
365 if (m_bufferLength > 0) {
366 rc = m_cephio->write(m_bufferStartingOffset, m_bufferLength);
368 BUFLOG(
"WriteBuffer write step failed: " << rc);
374 m_bufferStartingOffset=0;
375 m_bufferdata->invalidate();
ssize_t ceph_posix_maybestriper_pread(int fd, void *buf, size_t count, off64_t offset, bool allowStriper)
virtual ssize_t write_aio(XrdSfsAio *aoip) override
possible aio based code
virtual ssize_t rawWrite(void *buff, off_t offset, size_t blen)
virtual ~XrdCephBufferAlgSimple()
virtual ssize_t flushWriteCache() override
remember to flush the cache on final writes
XrdCephBufferAlgSimple(std::unique_ptr< IXrdCephBufferData > buffer, std::unique_ptr< ICephIOAdapter > cephio, int fd, bool useStriperlessReads=true)
virtual ssize_t write(const void *buff, off_t offset, size_t blen) override
write data through the buffer
virtual ssize_t rawRead(void *buff, off_t offset, size_t blen)
virtual ssize_t read(volatile void *buff, off_t offset, size_t blen) override
read data through the buffer
virtual ssize_t read_aio(XrdSfsAio *aoip) override
possible aio based code
virtual void doneRead()=0
virtual void doneWrite()=0
is a simple implementation of IXrdCephBufferData using std::vector<char> representation for the buffe...