XRootD
CephIOAdapterAIORaw.cc
Go to the documentation of this file.
1 #include "CephIOAdapterAIORaw.hh"
2 #include "../XrdCephPosix.hh"
3 #include "XrdOuc/XrdOucEnv.hh"
4 
5 #include <iostream>
6 #include <chrono>
7 #include <ratio>
8 #include <functional>
9 #include <memory>
10 #include <thread>
11 #include <chrono>
12 
13 using namespace XrdCephBuffer;
14 
15 using myclock = std::chrono::steady_clock;
16 //using myseconds = std::chrono::duration<float,
17 
18 namespace
19 {
20  static void aioReadCallback(XrdSfsAio *aiop, size_t rc)
21  {
22  // as in XrdCephOssFile
23  aiop->Result = rc;
24  aiop->doneRead();
25  }
26  static void aioWriteCallback(XrdSfsAio *aiop, size_t rc)
27  {
28  aiop->Result = rc;
29  aiop->doneWrite();
30  }
31 
32 } // anonymous namespace
33 
34 CephBufSfsAio::CephBufSfsAio() : m_lock(m_mutex)
35 {
36 }
37 
39 {
40  //BUFLOG("DoneRead");
41  m_dataOpDone = true;
42  m_lock.unlock();
43  m_condVar.notify_all();
44 }
45 
47 {
48  //BUFLOG("DoneWrite");
49  m_dataOpDone = true;
50  m_lock.unlock();
51  m_condVar.notify_all();
52 }
53 
54 CephIOAdapterAIORaw::CephIOAdapterAIORaw(IXrdCephBufferData *bufferdata, int fd) : m_bufferdata(bufferdata), m_fd(fd)
55 {
56 }
57 
59 {
60  // nothing to specifically to do; just print out some stats
61  float read_speed{0}, write_speed{0};
62  if (m_stats_read_req.load() > 0) {
63  read_speed = m_stats_read_bytes.load() / m_stats_read_timer.load() * 1e-3;
64  }
65  if (m_stats_write_req.load() > 0) {
66  write_speed = m_stats_write_bytes.load() / m_stats_write_timer.load() * 1e-3;
67  }
68  BUFLOG("CephIOAdapterAIORaw::Summary fd:" << m_fd
69  << " nwrite:" << m_stats_write_req << " byteswritten:" << m_stats_write_bytes << " write_s:"
70  << m_stats_write_timer * 1e-3 << " writemax_s" << m_stats_write_longest * 1e-3
71  << " write_MBs:" << write_speed
72  << " nread:" << m_stats_read_req << " bytesread:" << m_stats_read_bytes << " read_s:"
73  << m_stats_read_timer * 1e-3 << " readmax_s:" << m_stats_read_longest * 1e-3
74  << " read_MBs:" << read_speed );
75 }
76 
77 ssize_t CephIOAdapterAIORaw::write(off64_t offset, size_t count)
78 {
79  void *buf = m_bufferdata->raw();
80  if (!buf) {
81  BUFLOG("CephIOAdapterAIORaw::write null buffer was provided.")
82  return -EINVAL;
83  }
84  //BUFLOG("Make aio");
85  std::unique_ptr<XrdSfsAio> aiop = std::unique_ptr<XrdSfsAio>(new CephBufSfsAio());
86  aiocb &sfsAio = aiop->sfsAio;
87  // set the necessary parameters for the read, e.g. buffer pointer, offset and length
88  sfsAio.aio_buf = buf;
89  sfsAio.aio_nbytes = count;
90  sfsAio.aio_offset = offset;
91  // need the concrete object for the blocking / wait
92  CephBufSfsAio *ceph_aiop = dynamic_cast<CephBufSfsAio *>(aiop.get());
93 
94  long dt_ns{0};
95  ssize_t rc{0};
96  { // brace is for timer RAII
97  XrdCephBuffer::Timer_ns timer(dt_ns);
98  rc = ceph_aio_write(m_fd, aiop.get(), aioWriteCallback);
99 
100  if (rc < 0) {
101  BUFLOG("CephIOAdapterAIORaw::write ceph_aio_write returned rc:" << rc)
102  return rc;
103  }
104 
105  while (!ceph_aiop->isDone())
106  {
107  ceph_aiop->m_condVar.wait(ceph_aiop->m_lock, std::bind(&CephBufSfsAio::isDone, ceph_aiop));
108  }
109  } // timer brace
110 
111  // cleanup
112  rc = ceph_aiop->Result;
113  if (rc < 0) {
114  BUFLOG("CephIOAdapterAIORaw::write ceph_aiop->Result returned rc:" << rc)
115  }
116 
117  // BUFLOG("CephIOAdapterAIORaw::write fd:" << m_fd << " off:"
118  // << offset << " len:" << count << " rc:" << rc << " ms:" << dt_ns / 1000000);
119 
120  m_stats_write_longest = std::max(m_stats_write_longest, dt_ns / 1000000);
121  m_stats_write_timer.fetch_add(dt_ns / 1000000);
122  m_stats_write_bytes.fetch_add(rc);
123  ++m_stats_write_req;
124  return rc;
125 }
126 
127 ssize_t CephIOAdapterAIORaw::read(off64_t offset, size_t count)
128 {
129  void *buf = m_bufferdata->raw();
130  if (!buf)
131  {
132  BUFLOG("CephIOAdapterAIORaw::read null buffer was provided.")
133  return -EINVAL;
134  }
135 
136  std::unique_ptr<XrdSfsAio> aiop = std::unique_ptr<XrdSfsAio>(new CephBufSfsAio());
137  aiocb &sfsAio = aiop->sfsAio;
138  // set the necessary parameters for the read, e.g. buffer pointer, offset and length
139  sfsAio.aio_buf = buf;
140  sfsAio.aio_nbytes = count;
141  sfsAio.aio_offset = offset;
142  // need the concrete object for the blocking / wait
143  CephBufSfsAio *ceph_aiop = dynamic_cast<CephBufSfsAio *>(aiop.get());
144 
145  long dt_ns{0};
146  ssize_t rc{0};
147  { // timer brace RAII
148  XrdCephBuffer::Timer_ns timer(dt_ns);
149  // no check is made whether the buffer has sufficient capacity
150  // rc = ceph_posix_pread(m_fd,buf,count,offset);
151  //BUFLOG("Submit aio read: ");
152  rc = ceph_aio_read(m_fd, aiop.get(), aioReadCallback);
153 
154  if (rc < 0)
155  return rc;
156 
157  // now block until the read is done
158  // take the lock on the aio object
159  // while(!ceph_aiop->isDone()) { ceph_aiop->m_condVar.wait(lock,std::bind(&CephBufSfsAio::isDone,ceph_aiop) ); }
160  while (!ceph_aiop->isDone())
161  {
162  ceph_aiop->m_condVar.wait(ceph_aiop->m_lock, std::bind(&CephBufSfsAio::isDone, ceph_aiop));
163  }
164  } // timer brace
165 
166  // cleanup
167  rc = ceph_aiop->Result;
168 
169  m_stats_read_longest = std::max(m_stats_read_longest, dt_ns / 1000000);
170  m_stats_read_timer.fetch_add(dt_ns * 1e-6);
171  m_stats_read_bytes.fetch_add(rc);
172  ++m_stats_read_req;
173 
174  // BUFLOG("CephIOAdapterAIORaw::read fd:" << m_fd << " " << offset
175  // << " " << count << " " << rc << " " << dt_ns * 1e-6);
176 
177  if (rc >= 0)
178  {
179  m_bufferdata->setLength(rc);
180  m_bufferdata->setStartingOffset(offset);
181  m_bufferdata->setValid(true);
182  }
183  return rc;
184 }
#define BUFLOG(x)
Definition: BufferUtils.hh:23
std::chrono::steady_clock myclock
static void aioReadCallback(XrdSfsAio *aiop, size_t rc)
static void aioWriteCallback(XrdSfsAio *aiop, size_t rc)
ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb)
ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb)
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void * aio_buf
Definition: XrdSfsAio.hh:47
virtual void doneWrite() override
std::unique_lock< std::mutex > m_lock
std::condition_variable m_condVar
virtual void doneRead() override
virtual ssize_t read(off64_t offset, size_t count) override
Issue a ceph_posix_pread to read to the buffer data from file offset and len count....
virtual ssize_t write(off64_t offset, size_t count) override
Take the data in the buffer and write to ceph at given offset Issues a ceph_posix_pwrite for data in ...
CephIOAdapterAIORaw(IXrdCephBufferData *bufferdata, int fd)
Interface to the Buffer's physical representation. Allow an interface to encapsulate the requirements...
virtual off_t setStartingOffset(off_t offset)=0
virtual void setLength(size_t len)=0
Currently occupied and valid space, which may be less than capacity.
virtual void setValid(bool isValid)=0
virtual const void * raw() const =0
write data into the buffer, store the external offset
ssize_t Result
Definition: XrdSfsAio.hh:65
virtual void doneRead()=0
virtual void doneWrite()=0
is a simple implementation of IXrdCephBufferData using std::vector<char> representation for the buffe...
Definition: BufferUtils.hh:29