XRootD
XrdCephOssBufferedFile Class Reference

#include <XrdCephOssBufferedFile.hh>

+ Inheritance diagram for XrdCephOssBufferedFile:
+ Collaboration diagram for XrdCephOssBufferedFile:

Public Member Functions

 XrdCephOssBufferedFile (XrdCephOss *cephoss, XrdCephOssFile *cephossDF, size_t buffersize, const std::string &bufferIOmode, size_t maxNumberSimulBuffers)
 
virtual ~XrdCephOssBufferedFile ()
 
virtual int Close (long long *retsz=0)
 
virtual int Fstat (struct stat *buff)
 
virtual int Fsync (void)
 
virtual int Ftruncate (unsigned long long)
 
virtual int Open (const char *path, int flags, mode_t mode, XrdOucEnv &env)
 
virtual ssize_t Read (off_t offset, size_t blen)
 
virtual ssize_t Read (void *buff, off_t offset, size_t blen)
 
virtual int Read (XrdSfsAio *aoip)
 
virtual ssize_t ReadRaw (void *, off_t, size_t)
 
virtual ssize_t ReadV (XrdOucIOVec *readV, int rdvcnt)
 
virtual ssize_t Write (const void *buff, off_t offset, size_t blen)
 
virtual int Write (XrdSfsAio *aiop)
 
- Public Member Functions inherited from XrdCephOssFile
 XrdCephOssFile (XrdCephOss *cephoss)
 
virtual ~XrdCephOssFile ()
 
virtual int getFileDescriptor () const
 
- Public Member Functions inherited from XrdOssDF
 XrdOssDF (const char *tid="", uint16_t dftype=0, int fdnum=-1)
 
virtual ~XrdOssDF ()
 
uint16_t DFType ()
 
virtual int Fchmod (mode_t mode)
 
virtual int Fctl (int cmd, int alen, const char *args, char **resp=0)
 
virtual void Flush ()
 Flush filesystem cached pages for this file (used for checksums). More...
 
virtual int Fsync (XrdSfsAio *aiop)
 
virtual int getFD ()
 
virtual off_t getMmap (void **addr)
 
virtual const char * getTID ()
 
virtual int isCompressed (char *cxidp=0)
 
virtual int Opendir (const char *path, XrdOucEnv &env)
 
virtual ssize_t pgRead (void *buffer, off_t offset, size_t rdlen, uint32_t *csvec, uint64_t opts)
 
virtual int pgRead (XrdSfsAio *aioparm, uint64_t opts)
 
virtual ssize_t pgWrite (void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
 
virtual int pgWrite (XrdSfsAio *aioparm, uint64_t opts)
 
virtual int Readdir (char *buff, int blen)
 
virtual int StatRet (struct stat *buff)
 
virtual ssize_t WriteV (XrdOucIOVec *writeV, int wrvcnt)
 

Protected Member Functions

std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlgcreateBuffer ()
 

Protected Attributes

std::mutex m_buf_mutex
 
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlgm_bufferAlg
 
std::string m_bufferIOmode
 
std::map< size_t, std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > > m_bufferReadAlgs
 
size_t m_bufsize = 16*1024*1024L
 
std::atomic< size_t > m_bytesRead = {0}
 
std::atomic< size_t > m_bytesReadAIO = {0}
 number of bytes read or written More...
 
std::atomic< size_t > m_bytesReadV = {0}
 number of bytes read or written More...
 
std::atomic< size_t > m_bytesWrite = {0}
 number of bytes read or written More...
 
std::atomic< size_t > m_bytesWriteAIO = {0}
 number of bytes read or written More...
 
XrdCephOssm_cephoss = nullptr
 create a new instance of the buffer More...
 
int m_flags = 0
 number of ms to sleep if a retry is requested More...
 
int m_maxBufferRetries {5}
 set the maximum of buffers to open on a single instance (e.g. for simultaneous file reads) More...
 
int m_maxBufferRetrySleepTime_ms
 How many times to retry a ready from a buffer with EBUSY errors. More...
 
size_t m_maxCountReadBuffers {10}
 any data access method on the buffer will use this More...
 
std::string m_path
 
std::chrono::time_point< std::chrono::system_clock > m_timestart
 
XrdCephOssFilem_xrdOssDF = nullptr
 
- Protected Attributes inherited from XrdCephOssFile
XrdCephOssm_cephOss
 
int m_fd
 
- Protected Attributes inherited from XrdOssDF
uint16_t dfType
 
int fd
 
off_t pgwEOF
 
short rsvd
 
const char * tident
 

Additional Inherited Members

- Static Public Attributes inherited from XrdOssDF
static const uint16_t DF_isDir = 0x0001
 Object is for a directory. More...
 
static const uint16_t DF_isFile = 0x0002
 Object is for a file. More...
 
static const uint16_t DF_isProxy = 0x0010
 Object is a proxy object. More...
 
static const uint64_t doCalc = 0x4000000000000000ULL
 pgw: Calculate checksums More...
 
static const int Fctl_ckpObj = 0
 
static const int Fctl_utimes = 1
 
static const uint64_t Verify = 0x8000000000000000ULL
 all: Verify checksums More...
 

Detailed Description

Decorator class XrdCephOssBufferedFile designed to wrap XrdCephOssFile Functionality for buffered access to/from data in Ceph to avoid inefficient small reads / writes from the client side

Definition at line 48 of file XrdCephOssBufferedFile.hh.

Constructor & Destructor Documentation

◆ XrdCephOssBufferedFile()

XrdCephOssBufferedFile::XrdCephOssBufferedFile ( XrdCephOss cephoss,
XrdCephOssFile cephossDF,
size_t  buffersize,
const std::string &  bufferIOmode,
size_t  maxNumberSimulBuffers 
)

Definition at line 58 of file XrdCephOssBufferedFile.cc.

60  :
61  XrdCephOssFile(cephoss), m_cephoss(cephoss), m_xrdOssDF(cephossDF),
62  m_maxCountReadBuffers(maxNumberSimulBuffers),
64  m_bufsize(buffersize),
65  m_bufferIOmode(bufferIOmode)
66 {
67 
68 }
int m_maxBufferRetrySleepTime_ms
How many times to retry a ready from a buffer with EBUSY errors.
XrdCephOss * m_cephoss
create a new instance of the buffer
size_t m_maxCountReadBuffers
any data access method on the buffer will use this
XrdCephOssFile(XrdCephOss *cephoss)

◆ ~XrdCephOssBufferedFile()

XrdCephOssBufferedFile::~XrdCephOssBufferedFile ( )
virtual

Definition at line 70 of file XrdCephOssBufferedFile.cc.

70  {
71  // XrdCephEroute.Say("XrdCephOssBufferedFile::Destructor");
72 
73  // remember to delete the inner XrdCephOssFile object
74  if (m_xrdOssDF) {
75  delete m_xrdOssDF;
76  m_xrdOssDF = nullptr;
77  }
78 
79 }

References m_xrdOssDF.

Member Function Documentation

◆ Close()

int XrdCephOssBufferedFile::Close ( long long *  retsz = 0)
virtual

Close a directory or file.

Parameters
retszIf not nil, where the size of the file is to be returned.
Returns
0 upon success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 101 of file XrdCephOssBufferedFile.cc.

101  {
102  // if data is still in the buffer and we are writing, make sure to write it
103  if (m_bufferAlg && (m_flags & (O_WRONLY|O_RDWR)) != 0) {
104  ssize_t rc = m_bufferAlg->flushWriteCache();
105  if (rc < 0) {
106  LOGCEPH( "XrdCephOssBufferedFile::Close: flush Error fd: " << m_fd << " rc:" << rc );
107  // still try to close the file
108  ssize_t rc2 = m_xrdOssDF->Close(retsz);
109  if (rc2 < 0) {
110  LOGCEPH( "XrdCephOssBufferedFile::Close: Close error after flush Error fd: " << m_fd << " rc:" << rc2 );
111  }
112  return rc; // return the original flush error
113  } else {
114  LOGCEPH( "XrdCephOssBufferedFile::Close: Flushed data on close fd: " << m_fd << " rc:" << rc );
115  }
116  } // check for write
117  const std::chrono::time_point<std::chrono::system_clock> now =
118  std::chrono::system_clock::now();
119  const std::time_t t_s = std::chrono::system_clock::to_time_t(m_timestart);
120  const std::time_t t_c = std::chrono::system_clock::to_time_t(now);
121 
122  auto t_dur = std::chrono::duration_cast<std::chrono::milliseconds>(now - m_timestart).count();
123 
124  LOGCEPH("XrdCephOssBufferedFile::Summary: {\"fd\":" << m_fd << ", \"Elapsed_time_ms\":" << t_dur
125  << ", \"path\":\"" << m_path
126  << "\", read_B:" << m_bytesRead.load()
127  << ", readV_B:" << m_bytesReadV.load()
128  << ", readAIO_B:" << m_bytesReadAIO.load()
129  << ", writeB:" << m_bytesWrite.load()
130  << ", writeAIO_B:" << m_bytesWriteAIO.load()
131  << ", startTime:\"" << std::put_time(std::localtime(&t_s), "%F %T") << "\", endTime:\""
132  << std::put_time(std::localtime(&t_c), "%F %T") << "\""
133  << ", nBuffersRead:" << m_bufferReadAlgs.size()
134  << "}");
135 
136  return m_xrdOssDF->Close(retsz);
137 }
#define LOGCEPH(x)
Definition: XrdCephPosix.hh:51
std::map< size_t, std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > > m_bufferReadAlgs
std::atomic< size_t > m_bytesRead
std::chrono::time_point< std::chrono::system_clock > m_timestart
std::atomic< size_t > m_bytesReadV
number of bytes read or written
std::atomic< size_t > m_bytesWrite
number of bytes read or written
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > m_bufferAlg
std::atomic< size_t > m_bytesReadAIO
number of bytes read or written
std::atomic< size_t > m_bytesWriteAIO
number of bytes read or written
int m_flags
number of ms to sleep if a retry is requested
virtual int Close(long long *retsz=0)

References XrdCephOssFile::Close(), LOGCEPH, m_bufferAlg, m_bufferReadAlgs, m_bytesRead, m_bytesReadAIO, m_bytesReadV, m_bytesWrite, m_bytesWriteAIO, XrdCephOssFile::m_fd, m_flags, m_path, m_timestart, and m_xrdOssDF.

+ Here is the call graph for this function:

◆ createBuffer()

std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > XrdCephOssBufferedFile::createBuffer ( )
protected

Definition at line 313 of file XrdCephOssBufferedFile.cc.

313  {
314  std::unique_ptr<IXrdCephBufferAlg> bufferAlg;
315 
316  size_t bufferSize {m_bufsize}; // create buffer of default size
317  if (m_bufferReadAlgs.size() >= m_maxCountReadBuffers) {
318  BUFLOG("XrdCephOssBufferedFile: buffer reached max number of simul-buffers for this file: creating only 1MiB buffer" );
319  bufferSize = 1048576;
320  } else {
321  BUFLOG("XrdCephOssBufferedFile: buffer: got " << m_bufferReadAlgs.size() << " buffers already");
322  }
323 
324  try {
325  std::unique_ptr<IXrdCephBufferData> cephbuffer = std::unique_ptr<IXrdCephBufferData>(new XrdCephBufferDataSimple(bufferSize));
326  std::unique_ptr<ICephIOAdapter> cephio;
327  if (m_bufferIOmode == "aio") {
328  cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterAIORaw(cephbuffer.get(),m_fd));
329  } else if (m_bufferIOmode == "io") {
330  cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterRaw(cephbuffer.get(),m_fd,
332  } else {
333  BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io " );
334  m_xrdOssDF->Close();
335  return bufferAlg; // invalid instance;
336  }
337 
338  LOGCEPH( "XrdCephOssBufferedFile::Open: fd: " << m_fd << " Buffer created: " << cephbuffer->capacity() );
339  bufferAlg = std::unique_ptr<IXrdCephBufferAlg>(new XrdCephBufferAlgSimple(std::move(cephbuffer),std::move(cephio),m_fd) );
340  } catch (const std::bad_alloc &e) {
341  BUFLOG("XrdCephOssBufferedFile: Bad memory allocation in buffer: " << e.what() );
342  }
343 
344  return bufferAlg;
345  }
#define BUFLOG(x)
Definition: BufferUtils.hh:23
Implements a non-async read and write to ceph via aio ceph_posix calls Using the standard ceph_posix_...
Implements a non-async read and write to ceph via ceph_posix calls Using the standard ceph_posix_ cal...
Implementation of a buffer using a simple vector<char> Simplest implementation of a buffer using vect...
int m_useDefaultPreadAlg
Definition: XrdCephOss.hh:78

References BUFLOG, XrdCephOssFile::Close(), LOGCEPH, m_bufferIOmode, m_bufferReadAlgs, m_bufsize, m_cephoss, XrdCephOssFile::m_fd, m_maxCountReadBuffers, XrdCephOss::m_useDefaultPreadAlg, and m_xrdOssDF.

Referenced by Read(), and Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Fstat()

int XrdCephOssBufferedFile::Fstat ( struct stat buf)
virtual

Return state information for this file.

Parameters
buf- Pointer to the structure where info it to be returned.
Returns
0 upon success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 240 of file XrdCephOssBufferedFile.cc.

240  {
241  return m_xrdOssDF->Fstat(buff);
242 }
virtual int Fstat(struct stat *buff)

References XrdCephOssFile::Fstat(), and m_xrdOssDF.

+ Here is the call graph for this function:

◆ Fsync()

int XrdCephOssBufferedFile::Fsync ( void  )
virtual

Synchronize associated file with media (synchronous).

Returns
0 upon success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 304 of file XrdCephOssBufferedFile.cc.

304  {
305  return m_xrdOssDF->Fsync();
306 }
virtual int Fsync(void)

References XrdCephOssFile::Fsync(), and m_xrdOssDF.

+ Here is the call graph for this function:

◆ Ftruncate()

int XrdCephOssBufferedFile::Ftruncate ( unsigned long long  flen)
virtual

Set the size of the associated file.

Parameters
flen- The new size of the file.
Returns
0 upon success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 308 of file XrdCephOssBufferedFile.cc.

308  {
309  return m_xrdOssDF->Ftruncate(len);
310 }
virtual int Ftruncate(unsigned long long)

References XrdCephOssFile::Ftruncate(), and m_xrdOssDF.

+ Here is the call graph for this function:

◆ Open()

int XrdCephOssBufferedFile::Open ( const char *  path,
int  Oflag,
mode_t  Mode,
XrdOucEnv env 
)
virtual

Open a file.

Parameters
path- Pointer to the path of the file to be opened.
Oflag- Standard open flags.
Mode- File open mode (ignored unless creating a file).
env- Reference to environmental information.
Returns
0 upon success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 82 of file XrdCephOssBufferedFile.cc.

82  {
83 
84  int rc = m_xrdOssDF->Open(path, flags, mode, env);
85  if (rc < 0) {
86  return rc;
87  }
89  BUFLOG("XrdCephOssBufferedFile::Open got fd: " << m_fd << " " << path);
90  m_flags = flags; // e.g. for write/read knowledge
91  m_path = path; // good to keep the path for final stats presentation
92 
93 
94  // start the timer
95  //m_timestart = std::chrono::steady_clock::now();
96  m_timestart = std::chrono::system_clock::now();
97  // return the file descriptor
98  return rc;
99 }
virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env)
virtual int getFileDescriptor() const

References BUFLOG, XrdCephOssFile::getFileDescriptor(), XrdCephOssFile::m_fd, m_flags, m_path, m_timestart, m_xrdOssDF, and XrdCephOssFile::Open().

+ Here is the call graph for this function:

◆ Read() [1/3]

ssize_t XrdCephOssBufferedFile::Read ( off_t  offset,
size_t  size 
)
virtual

Preread file blocks into the file system cache.

Parameters
offset- The offset where the read is to start.
size- The number of bytes to pre-read.
Returns
0 upon success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 147 of file XrdCephOssBufferedFile.cc.

147  {
148  return m_xrdOssDF->Read(offset, blen);
149 }
virtual ssize_t Read(off_t offset, size_t blen)

References m_xrdOssDF, and XrdCephOssFile::Read().

+ Here is the call graph for this function:

◆ Read() [2/3]

ssize_t XrdCephOssBufferedFile::Read ( void *  buffer,
off_t  offset,
size_t  size 
)
virtual

Read file bytes into a buffer.

Parameters
buffer- pointer to buffer where the bytes are to be placed.
offset- The offset where the read is to start.
size- The number of bytes to read.
Returns
>= 0 The number of bytes that placed in buffer.
< 0 -errno or -osserr upon failure (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 151 of file XrdCephOssBufferedFile.cc.

151  {
152  size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());
153 
154  IXrdCephBufferAlg * buffer{nullptr};
155  // check for, and create if needed, a buffer
156  {
157  // lock in case need to create a new algorithm instance
158  const std::lock_guard<std::mutex> lock(m_buf_mutex);
159  auto buffer_itr = m_bufferReadAlgs.find(thread_id);
160  if (buffer_itr == m_bufferReadAlgs.end()) {
161  // only create a buffer, if we haven't hit the max buffers yet
162  auto buffer_ptr = createBuffer();
163  if (buffer_ptr) {
164  buffer = buffer_ptr.get();
165  m_bufferReadAlgs[thread_id] = std::move(buffer_ptr);
166  } else {
167  // if we can't create a buffer, we just have to pass through the read ...
168  ssize_t rc = m_xrdOssDF->Read(buff, offset, blen);
169  if (rc >= 0) {
170  LOGCEPH( "XrdCephOssBufferedFile::Read buffers and read failed with rc: " << rc );
171  }
172  return rc;
173  }
174  } else {
175  buffer = buffer_itr->second.get();
176  }
177  } // scope of lock
178 
179  int retry_counter{m_maxBufferRetries};
180  ssize_t rc {0};
181  while (retry_counter > 0) {
182  rc = buffer->read(buff, offset, blen);
183  if (rc != -EBUSY) break; // either worked, or is a real non busy error
184  LOGCEPH( "XrdCephOssBufferedFile::Read Recieved EBUSY for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Sleeping .. "
185  << " rc:" << rc << " off:" << offset << " len:" << blen);
186  std::this_thread::sleep_for(m_maxBufferRetrySleepTime_ms * 1ms);
187  --retry_counter;
188  }
189  if (retry_counter == 0) {
190  // reach maximum attempts for ebusy retry; fail the job
191  LOGCEPH( "XrdCephOssBufferedFile::Read Max attempts for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Terminating with -EIO: "
192  << " rc:" << rc << " off:" << offset << " len:" << blen );
193  // set a permanent error code:
194  rc = -EIO;
195  }
196  if (rc >=0) {
197  m_bytesRead.fetch_add(rc);
198  } else {
199  LOGCEPH( "XrdCephOssBufferedFile::Read: Read error fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen);
200  }
201  // LOGCEPH( "XrdCephOssBufferedFile::Read: Read good fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen);
202  return rc;
203 }
Interface to a holder of the main logic decisions of the buffering algortithm, decoupled from the buf...
int m_maxBufferRetries
set the maximum of buffers to open on a single instance (e.g. for simultaneous file reads)
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > createBuffer()

References createBuffer(), LOGCEPH, m_buf_mutex, m_bufferReadAlgs, m_bytesRead, XrdCephOssFile::m_fd, m_maxBufferRetries, m_maxBufferRetrySleepTime_ms, m_xrdOssDF, and XrdCephOssFile::Read().

+ Here is the call graph for this function:

◆ Read() [3/3]

int XrdCephOssBufferedFile::Read ( XrdSfsAio aiop)
virtual

Read file bytes using asynchronous I/O.

Parameters
aiop- Pointer to async I/O object controlling the I/O.
Returns
0 upon if request started success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 205 of file XrdCephOssBufferedFile.cc.

205  {
206  size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());
207  IXrdCephBufferAlg * buffer{nullptr};
208  // check for, and create if needed, a buffer
209  {
210  // lock in case need to create a new algorithm instance
211  const std::lock_guard<std::mutex> lock(m_buf_mutex);
212  auto buffer_itr = m_bufferReadAlgs.find(thread_id);
213  if (buffer_itr == m_bufferReadAlgs.end()) {
214  m_bufferReadAlgs[thread_id] = createBuffer();
215  buffer = m_bufferReadAlgs.find(thread_id)->second.get();
216  } else {
217  buffer = buffer_itr->second.get();
218  }
219  }
220 
221  // LOGCEPH("XrdCephOssBufferedFile::AIOREAD: fd: " << m_xrdOssDF->getFileDescriptor() << " " << time(nullptr) << " : "
222  // << aiop->sfsAio.aio_offset << " "
223  // << aiop->sfsAio.aio_nbytes << " " << aiop->sfsAio.aio_reqprio << " "
224  // << aiop->sfsAio.aio_fildes );
225  ssize_t rc = buffer->read_aio(aiop);
226  if (rc > 0) {
227  m_bytesReadAIO.fetch_add(rc);
228  } else {
229  LOGCEPH( "XrdCephOssBufferedFile::Read: ReadAIO error fd: " << m_fd << " rc:" << rc
230  << " off:" << aiop->sfsAio.aio_offset << " len:" << aiop->sfsAio.aio_nbytes );
231  }
232  return rc;
233 }
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62

References aiocb::aio_nbytes, aiocb::aio_offset, createBuffer(), LOGCEPH, m_buf_mutex, m_bufferReadAlgs, m_bytesReadAIO, XrdCephOssFile::m_fd, and XrdSfsAio::sfsAio.

+ Here is the call graph for this function:

◆ ReadRaw()

ssize_t XrdCephOssBufferedFile::ReadRaw ( void *  buffer,
off_t  offset,
size_t  size 
)
virtual

Read uncompressed file bytes into a buffer.

Parameters
buffer- pointer to buffer where the bytes are to be placed.
offset- The offset where the read is to start.
size- The number of bytes to read.
Returns
>= 0 The number of bytes that placed in buffer.
< 0 -errno or -osserr upon failure (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 235 of file XrdCephOssBufferedFile.cc.

235  {
236  // #TODO; ReadRaw should bypass the buffer ?
237  return m_xrdOssDF->ReadRaw(buff, offset, blen);
238 }
virtual ssize_t ReadRaw(void *, off_t, size_t)

References m_xrdOssDF, and XrdCephOssFile::ReadRaw().

+ Here is the call graph for this function:

◆ ReadV()

ssize_t XrdCephOssBufferedFile::ReadV ( XrdOucIOVec readV,
int  rdvcnt 
)
virtual

Read file bytes as directed by the read vector.

Parameters
readVpointer to the array of read requests.
rdvcntthe number of elements in readV.
Returns
>=0 The numbe of bytes read.
< 0 -errno or -osserr upon failure (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 140 of file XrdCephOssBufferedFile.cc.

140  {
141  // don't touch readV in the buffering method
142  ssize_t rc = m_xrdOssDF->ReadV(readV,rnum);
143  if (rc > 0) m_bytesReadV.fetch_add(rc);
144  return rc;
145 }
virtual ssize_t ReadV(XrdOucIOVec *readV, int n)

References m_bytesReadV, m_xrdOssDF, and XrdCephOssFile::ReadV().

+ Here is the call graph for this function:

◆ Write() [1/2]

ssize_t XrdCephOssBufferedFile::Write ( const void *  buffer,
off_t  offset,
size_t  size 
)
virtual

Write file bytes from a buffer.

Parameters
buffer- pointer to buffer where the bytes reside.
offset- The offset where the write is to start.
size- The number of bytes to write.
Returns
>= 0 The number of bytes that were written.
< 0 -errno or -osserr upon failure (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 244 of file XrdCephOssBufferedFile.cc.

244  {
245 
246  if (!m_bufferAlg) {
248  if (!m_bufferAlg) {
249  LOGCEPH( "XrdCephOssBufferedFile: Error in creating buffered object");
250  return -EINVAL;
251  }
252  }
253 
254 
255  int retry_counter{m_maxBufferRetries};
256  ssize_t rc {0};
257  while (retry_counter > 0) {
258  rc = m_bufferAlg->write(buff, offset, blen);
259  if (rc != -EBUSY) break; // either worked, or is a real non busy error
260  LOGCEPH( "XrdCephOssBufferedFile::Write Recieved EBUSY for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Sleeping .. "
261  << " rc:" << rc << " off:" << offset << " len:" << blen);
262  std::this_thread::sleep_for(m_maxBufferRetrySleepTime_ms * 1ms);
263  --retry_counter;
264  }
265  if (retry_counter == 0) {
266  // reach maximum attempts for ebusy retry; fail the job
267  LOGCEPH( "XrdCephOssBufferedFile::Write Max attempts for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Terminating with -EIO: "
268  << " rc:" << rc << " off:" << offset << " len:" << blen );
269  // set a permanent error code:
270  rc = -EIO;
271  }
272  if (rc >=0) {
273  m_bytesWrite.fetch_add(rc);
274  } else {
275  LOGCEPH( "XrdCephOssBufferedFile::Write: Write error fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen);
276  }
277  return rc;
278 }

References createBuffer(), LOGCEPH, m_bufferAlg, m_bytesWrite, XrdCephOssFile::m_fd, m_maxBufferRetries, and m_maxBufferRetrySleepTime_ms.

+ Here is the call graph for this function:

◆ Write() [2/2]

int XrdCephOssBufferedFile::Write ( XrdSfsAio aiop)
virtual

Write file bytes using asynchronous I/O.

Parameters
aiop- Pointer to async I/O object controlling the I/O.
Returns
0 upon if request started success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 280 of file XrdCephOssBufferedFile.cc.

280  {
281  if (!m_bufferAlg) {
283  if (!m_bufferAlg) {
284  LOGCEPH( "XrdCephOssBufferedFile: Error in creating buffered object");
285  return -EINVAL;
286  }
287  }
288 
289  // LOGCEPH("XrdCephOssBufferedFile::AIOWRITE: fd: " << m_xrdOssDF->getFileDescriptor() << " " << time(nullptr) << " : "
290  // << aiop->sfsAio.aio_offset << " "
291  // << aiop->sfsAio.aio_nbytes << " " << aiop->sfsAio.aio_reqprio << " "
292  // << aiop->sfsAio.aio_fildes << " " );
293  ssize_t rc = m_bufferAlg->write_aio(aiop);
294  if (rc > 0) {
295  m_bytesWriteAIO.fetch_add(rc);
296  } else {
297  LOGCEPH( "XrdCephOssBufferedFile::Write: WriteAIO error fd: " << m_fd << " rc:" << rc
298  << " off:" << aiop->sfsAio.aio_offset << " len:" << aiop->sfsAio.aio_nbytes );
299  }
300  return rc;
301 
302 }

References aiocb::aio_nbytes, aiocb::aio_offset, createBuffer(), LOGCEPH, m_bufferAlg, m_bytesWriteAIO, XrdCephOssFile::m_fd, and XrdSfsAio::sfsAio.

+ Here is the call graph for this function:

Member Data Documentation

◆ m_buf_mutex

std::mutex XrdCephOssBufferedFile::m_buf_mutex
protected

Definition at line 76 of file XrdCephOssBufferedFile.hh.

Referenced by Read().

◆ m_bufferAlg

std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> XrdCephOssBufferedFile::m_bufferAlg
protected

Definition at line 74 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), and Write().

◆ m_bufferIOmode

std::string XrdCephOssBufferedFile::m_bufferIOmode
protected

Definition at line 85 of file XrdCephOssBufferedFile.hh.

Referenced by createBuffer().

◆ m_bufferReadAlgs

std::map<size_t, std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> > XrdCephOssBufferedFile::m_bufferReadAlgs
protected

Definition at line 75 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), createBuffer(), and Read().

◆ m_bufsize

size_t XrdCephOssBufferedFile::m_bufsize = 16*1024*1024L
protected

Definition at line 84 of file XrdCephOssBufferedFile.hh.

Referenced by createBuffer().

◆ m_bytesRead

std::atomic<size_t> XrdCephOssBufferedFile::m_bytesRead = {0}
protected

Definition at line 88 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), and Read().

◆ m_bytesReadAIO

std::atomic<size_t> XrdCephOssBufferedFile::m_bytesReadAIO = {0}
protected

number of bytes read or written

Definition at line 90 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), and Read().

◆ m_bytesReadV

std::atomic<size_t> XrdCephOssBufferedFile::m_bytesReadV = {0}
protected

number of bytes read or written

Definition at line 89 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), and ReadV().

◆ m_bytesWrite

std::atomic<size_t> XrdCephOssBufferedFile::m_bytesWrite = {0}
protected

number of bytes read or written

Definition at line 91 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), and Write().

◆ m_bytesWriteAIO

std::atomic<size_t> XrdCephOssBufferedFile::m_bytesWriteAIO = {0}
protected

number of bytes read or written

Definition at line 92 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), and Write().

◆ m_cephoss

XrdCephOss* XrdCephOssBufferedFile::m_cephoss = nullptr
protected

create a new instance of the buffer

Definition at line 72 of file XrdCephOssBufferedFile.hh.

Referenced by createBuffer().

◆ m_flags

int XrdCephOssBufferedFile::m_flags = 0
protected

number of ms to sleep if a retry is requested

Definition at line 83 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), and Open().

◆ m_maxBufferRetries

int XrdCephOssBufferedFile::m_maxBufferRetries {5}
protected

set the maximum of buffers to open on a single instance (e.g. for simultaneous file reads)

Definition at line 80 of file XrdCephOssBufferedFile.hh.

Referenced by Read(), and Write().

◆ m_maxBufferRetrySleepTime_ms

int XrdCephOssBufferedFile::m_maxBufferRetrySleepTime_ms
protected

How many times to retry a ready from a buffer with EBUSY errors.

Definition at line 81 of file XrdCephOssBufferedFile.hh.

Referenced by Read(), and Write().

◆ m_maxCountReadBuffers

size_t XrdCephOssBufferedFile::m_maxCountReadBuffers {10}
protected

any data access method on the buffer will use this

Definition at line 77 of file XrdCephOssBufferedFile.hh.

Referenced by createBuffer().

◆ m_path

std::string XrdCephOssBufferedFile::m_path
protected

Definition at line 86 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), and Open().

◆ m_timestart

std::chrono::time_point<std::chrono::system_clock> XrdCephOssBufferedFile::m_timestart
protected

Definition at line 87 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), and Open().

◆ m_xrdOssDF

XrdCephOssFile* XrdCephOssBufferedFile::m_xrdOssDF = nullptr
protected

The documentation for this class was generated from the following files: