XRootD
XrdCephBuffer::CephIOAdapterAIORaw Class Reference

Implements a non-async read and write to ceph via aio ceph_posix calls Using the standard ceph_posix_aio calls do the actual read and write operations. No ownership is taken on the buffer that's passed via the constructor Although using aio calls, we block here until the data has been read/written. More...

#include <CephIOAdapterAIORaw.hh>

+ Inheritance diagram for XrdCephBuffer::CephIOAdapterAIORaw:
+ Collaboration diagram for XrdCephBuffer::CephIOAdapterAIORaw:

Public Member Functions

 CephIOAdapterAIORaw (IXrdCephBufferData *bufferdata, int fd)
 
virtual ~CephIOAdapterAIORaw ()
 
virtual ssize_t read (off64_t offset, size_t count) override
 Issue a ceph_posix_pread to read to the buffer data from file offset and len count. No range checking is currently provided here. The caller must provide sufficient space for the max len read. Returns -ve errorcode on failure, else the number of bytes returned. More...
 
virtual ssize_t write (off64_t offset, size_t count) override
 Take the data in the buffer and write to ceph at given offset Issues a ceph_posix_pwrite for data in the buffer (from pos 0) into ceph at position offset with len count. Returns -ve on error, else the number of bytes writen. More...
 
- Public Member Functions inherited from XrdCephBuffer::ICephIOAdapter
virtual ~ICephIOAdapter ()
 

Detailed Description

Implements a non-async read and write to ceph via aio ceph_posix calls Using the standard ceph_posix_aio calls do the actual read and write operations. No ownership is taken on the buffer that's passed via the constructor Although using aio calls, we block here until the data has been read/written.

Definition at line 59 of file CephIOAdapterAIORaw.hh.

Constructor & Destructor Documentation

◆ CephIOAdapterAIORaw()

CephIOAdapterAIORaw::CephIOAdapterAIORaw ( IXrdCephBufferData bufferdata,
int  fd 
)

Definition at line 54 of file CephIOAdapterAIORaw.cc.

54  : m_bufferdata(bufferdata), m_fd(fd)
55 {
56 }

◆ ~CephIOAdapterAIORaw()

CephIOAdapterAIORaw::~CephIOAdapterAIORaw ( )
virtual

Definition at line 58 of file CephIOAdapterAIORaw.cc.

59 {
60  // nothing to specifically to do; just print out some stats
61  float read_speed{0}, write_speed{0};
62  if (m_stats_read_req.load() > 0) {
63  read_speed = m_stats_read_bytes.load() / m_stats_read_timer.load() * 1e-3;
64  }
65  if (m_stats_write_req.load() > 0) {
66  write_speed = m_stats_write_bytes.load() / m_stats_write_timer.load() * 1e-3;
67  }
68  BUFLOG("CephIOAdapterAIORaw::Summary fd:" << m_fd
69  << " nwrite:" << m_stats_write_req << " byteswritten:" << m_stats_write_bytes << " write_s:"
70  << m_stats_write_timer * 1e-3 << " writemax_s" << m_stats_write_longest * 1e-3
71  << " write_MBs:" << write_speed
72  << " nread:" << m_stats_read_req << " bytesread:" << m_stats_read_bytes << " read_s:"
73  << m_stats_read_timer * 1e-3 << " readmax_s:" << m_stats_read_longest * 1e-3
74  << " read_MBs:" << read_speed );
75 }
#define BUFLOG(x)
Definition: BufferUtils.hh:23

References BUFLOG.

Member Function Documentation

◆ read()

ssize_t CephIOAdapterAIORaw::read ( off64_t  offset,
size_t  count 
)
overridevirtual

Issue a ceph_posix_pread to read to the buffer data from file offset and len count. No range checking is currently provided here. The caller must provide sufficient space for the max len read. Returns -ve errorcode on failure, else the number of bytes returned.

Parameters
offset
count
Returns
ssize_t

Implements XrdCephBuffer::ICephIOAdapter.

Definition at line 127 of file CephIOAdapterAIORaw.cc.

128 {
129  void *buf = m_bufferdata->raw();
130  if (!buf)
131  {
132  BUFLOG("CephIOAdapterAIORaw::read null buffer was provided.")
133  return -EINVAL;
134  }
135 
136  std::unique_ptr<XrdSfsAio> aiop = std::unique_ptr<XrdSfsAio>(new CephBufSfsAio());
137  aiocb &sfsAio = aiop->sfsAio;
138  // set the necessary parameters for the read, e.g. buffer pointer, offset and length
139  sfsAio.aio_buf = buf;
140  sfsAio.aio_nbytes = count;
141  sfsAio.aio_offset = offset;
142  // need the concrete object for the blocking / wait
143  CephBufSfsAio *ceph_aiop = dynamic_cast<CephBufSfsAio *>(aiop.get());
144 
145  long dt_ns{0};
146  ssize_t rc{0};
147  { // timer brace RAII
148  XrdCephBuffer::Timer_ns timer(dt_ns);
149  // no check is made whether the buffer has sufficient capacity
150  // rc = ceph_posix_pread(m_fd,buf,count,offset);
151  //BUFLOG("Submit aio read: ");
152  rc = ceph_aio_read(m_fd, aiop.get(), aioReadCallback);
153 
154  if (rc < 0)
155  return rc;
156 
157  // now block until the read is done
158  // take the lock on the aio object
159  // while(!ceph_aiop->isDone()) { ceph_aiop->m_condVar.wait(lock,std::bind(&CephBufSfsAio::isDone,ceph_aiop) ); }
160  while (!ceph_aiop->isDone())
161  {
162  ceph_aiop->m_condVar.wait(ceph_aiop->m_lock, std::bind(&CephBufSfsAio::isDone, ceph_aiop));
163  }
164  } // timer brace
165 
166  // cleanup
167  rc = ceph_aiop->Result;
168 
169  m_stats_read_longest = std::max(m_stats_read_longest, dt_ns / 1000000);
170  m_stats_read_timer.fetch_add(dt_ns * 1e-6);
171  m_stats_read_bytes.fetch_add(rc);
172  ++m_stats_read_req;
173 
174  // BUFLOG("CephIOAdapterAIORaw::read fd:" << m_fd << " " << offset
175  // << " " << count << " " << rc << " " << dt_ns * 1e-6);
176 
177  if (rc >= 0)
178  {
179  m_bufferdata->setLength(rc);
180  m_bufferdata->setStartingOffset(offset);
181  m_bufferdata->setValid(true);
182  }
183  return rc;
184 }
static void aioReadCallback(XrdSfsAio *aiop, size_t rc)
ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb)
virtual off_t setStartingOffset(off_t offset)=0
virtual void setLength(size_t len)=0
Currently occupied and valid space, which may be less than capacity.
virtual void setValid(bool isValid)=0
virtual const void * raw() const =0
write data into the buffer, store the external offset

References aiocb::aio_buf, aiocb::aio_nbytes, aiocb::aio_offset, aioReadCallback(), BUFLOG, ceph_aio_read(), XrdCephBuffer::CephBufSfsAio::isDone(), XrdCephBuffer::CephBufSfsAio::m_condVar, XrdCephBuffer::CephBufSfsAio::m_lock, XrdCephBuffer::IXrdCephBufferData::raw(), XrdSfsAio::Result, XrdCephBuffer::IXrdCephBufferData::setLength(), XrdCephBuffer::IXrdCephBufferData::setStartingOffset(), and XrdCephBuffer::IXrdCephBufferData::setValid().

+ Here is the call graph for this function:

◆ write()

ssize_t CephIOAdapterAIORaw::write ( off64_t  offset,
size_t  count 
)
overridevirtual

Take the data in the buffer and write to ceph at given offset Issues a ceph_posix_pwrite for data in the buffer (from pos 0) into ceph at position offset with len count. Returns -ve on error, else the number of bytes writen.

Parameters
offset
count
Returns
ssize_t

Implements XrdCephBuffer::ICephIOAdapter.

Definition at line 77 of file CephIOAdapterAIORaw.cc.

78 {
79  void *buf = m_bufferdata->raw();
80  if (!buf) {
81  BUFLOG("CephIOAdapterAIORaw::write null buffer was provided.")
82  return -EINVAL;
83  }
84  //BUFLOG("Make aio");
85  std::unique_ptr<XrdSfsAio> aiop = std::unique_ptr<XrdSfsAio>(new CephBufSfsAio());
86  aiocb &sfsAio = aiop->sfsAio;
87  // set the necessary parameters for the read, e.g. buffer pointer, offset and length
88  sfsAio.aio_buf = buf;
89  sfsAio.aio_nbytes = count;
90  sfsAio.aio_offset = offset;
91  // need the concrete object for the blocking / wait
92  CephBufSfsAio *ceph_aiop = dynamic_cast<CephBufSfsAio *>(aiop.get());
93 
94  long dt_ns{0};
95  ssize_t rc{0};
96  { // brace is for timer RAII
97  XrdCephBuffer::Timer_ns timer(dt_ns);
98  rc = ceph_aio_write(m_fd, aiop.get(), aioWriteCallback);
99 
100  if (rc < 0) {
101  BUFLOG("CephIOAdapterAIORaw::write ceph_aio_write returned rc:" << rc)
102  return rc;
103  }
104 
105  while (!ceph_aiop->isDone())
106  {
107  ceph_aiop->m_condVar.wait(ceph_aiop->m_lock, std::bind(&CephBufSfsAio::isDone, ceph_aiop));
108  }
109  } // timer brace
110 
111  // cleanup
112  rc = ceph_aiop->Result;
113  if (rc < 0) {
114  BUFLOG("CephIOAdapterAIORaw::write ceph_aiop->Result returned rc:" << rc)
115  }
116 
117  // BUFLOG("CephIOAdapterAIORaw::write fd:" << m_fd << " off:"
118  // << offset << " len:" << count << " rc:" << rc << " ms:" << dt_ns / 1000000);
119 
120  m_stats_write_longest = std::max(m_stats_write_longest, dt_ns / 1000000);
121  m_stats_write_timer.fetch_add(dt_ns / 1000000);
122  m_stats_write_bytes.fetch_add(rc);
123  ++m_stats_write_req;
124  return rc;
125 }
static void aioWriteCallback(XrdSfsAio *aiop, size_t rc)
ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb)

References aiocb::aio_buf, aiocb::aio_nbytes, aiocb::aio_offset, aioWriteCallback(), BUFLOG, ceph_aio_write(), XrdCephBuffer::CephBufSfsAio::isDone(), XrdCephBuffer::CephBufSfsAio::m_condVar, XrdCephBuffer::CephBufSfsAio::m_lock, XrdCephBuffer::IXrdCephBufferData::raw(), and XrdSfsAio::Result.

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: