25 #include <sys/types.h>
52 using namespace std::chrono_literals;
59 size_t buffersize,
const std::string& bufferIOmode,
60 size_t maxNumberSimulBuffers):
61 XrdCephOssFile(cephoss), m_cephoss(cephoss), m_xrdOssDF(cephossDF),
62 m_maxCountReadBuffers(maxNumberSimulBuffers),
63 m_maxBufferRetrySleepTime_ms(1000),
64 m_bufsize(buffersize),
65 m_bufferIOmode(bufferIOmode)
89 BUFLOG(
"XrdCephOssBufferedFile::Open got fd: " <<
m_fd <<
" " << path);
106 LOGCEPH(
"XrdCephOssBufferedFile::Close: flush Error fd: " <<
m_fd <<
" rc:" << rc );
110 LOGCEPH(
"XrdCephOssBufferedFile::Close: Close error after flush Error fd: " <<
m_fd <<
" rc:" << rc2 );
114 LOGCEPH(
"XrdCephOssBufferedFile::Close: Flushed data on close fd: " <<
m_fd <<
" rc:" << rc );
117 const std::chrono::time_point<std::chrono::system_clock> now =
118 std::chrono::system_clock::now();
119 const std::time_t t_s = std::chrono::system_clock::to_time_t(
m_timestart);
120 const std::time_t t_c = std::chrono::system_clock::to_time_t(now);
122 auto t_dur = std::chrono::duration_cast<std::chrono::milliseconds>(now -
m_timestart).count();
124 LOGCEPH(
"XrdCephOssBufferedFile::Summary: {\"fd\":" <<
m_fd <<
", \"Elapsed_time_ms\":" << t_dur
125 <<
", \"path\":\"" <<
m_path
131 <<
", startTime:\"" << std::put_time(std::localtime(&t_s),
"%F %T") <<
"\", endTime:\""
132 << std::put_time(std::localtime(&t_c),
"%F %T") <<
"\""
152 size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());
158 const std::lock_guard<std::mutex> lock(
m_buf_mutex);
164 buffer = buffer_ptr.get();
170 LOGCEPH(
"XrdCephOssBufferedFile::Read buffers and read failed with rc: " << rc );
175 buffer = buffer_itr->second.get();
181 while (retry_counter > 0) {
182 rc = buffer->read(buff, offset, blen);
183 if (rc != -EBUSY)
break;
184 LOGCEPH(
"XrdCephOssBufferedFile::Read Recieved EBUSY for fd: " <<
m_fd <<
" on try: " << (
m_maxBufferRetries-retry_counter) <<
". Sleeping .. "
185 <<
" rc:" << rc <<
" off:" << offset <<
" len:" << blen);
189 if (retry_counter == 0) {
191 LOGCEPH(
"XrdCephOssBufferedFile::Read Max attempts for fd: " <<
m_fd <<
" on try: " << (
m_maxBufferRetries-retry_counter) <<
". Terminating with -EIO: "
192 <<
" rc:" << rc <<
" off:" << offset <<
" len:" << blen );
199 LOGCEPH(
"XrdCephOssBufferedFile::Read: Read error fd: " <<
m_fd <<
" rc:" << rc <<
" off:" << offset <<
" len:" << blen);
206 size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());
211 const std::lock_guard<std::mutex> lock(
m_buf_mutex);
217 buffer = buffer_itr->second.get();
225 ssize_t rc = buffer->read_aio(aiop);
229 LOGCEPH(
"XrdCephOssBufferedFile::Read: ReadAIO error fd: " <<
m_fd <<
" rc:" << rc
249 LOGCEPH(
"XrdCephOssBufferedFile: Error in creating buffered object");
257 while (retry_counter > 0) {
259 if (rc != -EBUSY)
break;
260 LOGCEPH(
"XrdCephOssBufferedFile::Write Recieved EBUSY for fd: " <<
m_fd <<
" on try: " << (
m_maxBufferRetries-retry_counter) <<
". Sleeping .. "
261 <<
" rc:" << rc <<
" off:" << offset <<
" len:" << blen);
265 if (retry_counter == 0) {
267 LOGCEPH(
"XrdCephOssBufferedFile::Write Max attempts for fd: " <<
m_fd <<
" on try: " << (
m_maxBufferRetries-retry_counter) <<
". Terminating with -EIO: "
268 <<
" rc:" << rc <<
" off:" << offset <<
" len:" << blen );
275 LOGCEPH(
"XrdCephOssBufferedFile::Write: Write error fd: " <<
m_fd <<
" rc:" << rc <<
" off:" << offset <<
" len:" << blen);
284 LOGCEPH(
"XrdCephOssBufferedFile: Error in creating buffered object");
297 LOGCEPH(
"XrdCephOssBufferedFile::Write: WriteAIO error fd: " <<
m_fd <<
" rc:" << rc
314 std::unique_ptr<IXrdCephBufferAlg> bufferAlg;
318 BUFLOG(
"XrdCephOssBufferedFile: buffer reached max number of simul-buffers for this file: creating only 1MiB buffer" );
319 bufferSize = 1048576;
325 std::unique_ptr<IXrdCephBufferData> cephbuffer = std::unique_ptr<IXrdCephBufferData>(
new XrdCephBufferDataSimple(bufferSize));
326 std::unique_ptr<ICephIOAdapter> cephio;
333 BUFLOG(
"XrdCephOssBufferedFile: buffer mode needs to be one of aio|io " );
338 LOGCEPH(
"XrdCephOssBufferedFile::Open: fd: " <<
m_fd <<
" Buffer created: " << cephbuffer->capacity() );
340 }
catch (
const std::bad_alloc &e) {
341 BUFLOG(
"XrdCephOssBufferedFile: Bad memory allocation in buffer: " << e.what() );
XrdSysError XrdCephEroute
int stat(const char *path, struct stat *buf)
Implements a non-async read and write to ceph via aio ceph_posix calls Using the standard ceph_posix_...
Implements a non-async read and write to ceph via ceph_posix calls Using the standard ceph_posix_ cal...
Interface to a holder of the main logic decisions of the buffering algortithm, decoupled from the buf...
Implementation of a buffer using a simple vector<char> Simplest implementation of a buffer using vect...
std::string m_bufferIOmode
std::map< size_t, std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > > m_bufferReadAlgs
std::atomic< size_t > m_bytesRead
virtual int Ftruncate(unsigned long long)
std::chrono::time_point< std::chrono::system_clock > m_timestart
int m_maxBufferRetrySleepTime_ms
How many times to retry a ready from a buffer with EBUSY errors.
int m_maxBufferRetries
set the maximum of buffers to open on a single instance (e.g. for simultaneous file reads)
virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env)
virtual int Fstat(struct stat *buff)
std::atomic< size_t > m_bytesReadV
number of bytes read or written
std::atomic< size_t > m_bytesWrite
number of bytes read or written
virtual ~XrdCephOssBufferedFile()
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > m_bufferAlg
XrdCephOss * m_cephoss
create a new instance of the buffer
virtual ssize_t Write(const void *buff, off_t offset, size_t blen)
std::atomic< size_t > m_bytesReadAIO
number of bytes read or written
std::atomic< size_t > m_bytesWriteAIO
number of bytes read or written
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Read(off_t offset, size_t blen)
virtual int Close(long long *retsz=0)
int m_flags
number of ms to sleep if a retry is requested
XrdCephOssFile * m_xrdOssDF
size_t m_maxCountReadBuffers
any data access method on the buffer will use this
virtual ssize_t ReadRaw(void *, off_t, size_t)
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > createBuffer()
XrdCephOssBufferedFile(XrdCephOss *cephoss, XrdCephOssFile *cephossDF, size_t buffersize, const std::string &bufferIOmode, size_t maxNumberSimulBuffers)
virtual ssize_t Read(off_t offset, size_t blen)
virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env)
virtual int Fstat(struct stat *buff)
virtual int Ftruncate(unsigned long long)
virtual ssize_t ReadRaw(void *, off_t, size_t)
virtual ssize_t ReadV(XrdOucIOVec *readV, int n)
virtual int Close(long long *retsz=0)
virtual int getFileDescriptor() const
is a simple implementation of IXrdCephBufferData using std::vector<char> representation for the buffe...