XRootD
XrdCephOssBufferedFile.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN)
3 // Author: Sebastien Ponce <sebastien.ponce@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include <sys/types.h>
26 #include <unistd.h>
27 #include <sstream>
28 #include <iostream>
29 #include <fcntl.h>
30 #include <iomanip>
31 #include <new>
32 #include <ctime>
33 #include <chrono>
34 #include <thread>
35 
36 #include "XrdCeph/XrdCephPosix.hh"
37 #include "XrdOuc/XrdOucEnv.hh"
38 #include "XrdSys/XrdSysError.hh"
39 #include "XrdOuc/XrdOucTrace.hh"
40 #include "XrdSfs/XrdSfsAio.hh"
42 
48 
49 #include <thread>
50 
51 using namespace XrdCephBuffer;
52 using namespace std::chrono_literals;
53 
56 
57 
59  size_t buffersize,const std::string& bufferIOmode,
60  size_t maxNumberSimulBuffers):
61  XrdCephOssFile(cephoss), m_cephoss(cephoss), m_xrdOssDF(cephossDF),
62  m_maxCountReadBuffers(maxNumberSimulBuffers),
63  m_maxBufferRetrySleepTime_ms(1000),
64  m_bufsize(buffersize),
65  m_bufferIOmode(bufferIOmode)
66 {
67 
68 }
69 
71  // XrdCephEroute.Say("XrdCephOssBufferedFile::Destructor");
72 
73  // remember to delete the inner XrdCephOssFile object
74  if (m_xrdOssDF) {
75  delete m_xrdOssDF;
76  m_xrdOssDF = nullptr;
77  }
78 
79 }
80 
81 
82 int XrdCephOssBufferedFile::Open(const char *path, int flags, mode_t mode, XrdOucEnv &env) {
83 
84  int rc = m_xrdOssDF->Open(path, flags, mode, env);
85  if (rc < 0) {
86  return rc;
87  }
89  BUFLOG("XrdCephOssBufferedFile::Open got fd: " << m_fd << " " << path);
90  m_flags = flags; // e.g. for write/read knowledge
91  m_path = path; // good to keep the path for final stats presentation
92 
93 
94  // start the timer
95  //m_timestart = std::chrono::steady_clock::now();
96  m_timestart = std::chrono::system_clock::now();
97  // return the file descriptor
98  return rc;
99 }
100 
101 int XrdCephOssBufferedFile::Close(long long *retsz) {
102  // if data is still in the buffer and we are writing, make sure to write it
103  if (m_bufferAlg && (m_flags & (O_WRONLY|O_RDWR)) != 0) {
104  ssize_t rc = m_bufferAlg->flushWriteCache();
105  if (rc < 0) {
106  LOGCEPH( "XrdCephOssBufferedFile::Close: flush Error fd: " << m_fd << " rc:" << rc );
107  // still try to close the file
108  ssize_t rc2 = m_xrdOssDF->Close(retsz);
109  if (rc2 < 0) {
110  LOGCEPH( "XrdCephOssBufferedFile::Close: Close error after flush Error fd: " << m_fd << " rc:" << rc2 );
111  }
112  return rc; // return the original flush error
113  } else {
114  LOGCEPH( "XrdCephOssBufferedFile::Close: Flushed data on close fd: " << m_fd << " rc:" << rc );
115  }
116  } // check for write
117  const std::chrono::time_point<std::chrono::system_clock> now =
118  std::chrono::system_clock::now();
119  const std::time_t t_s = std::chrono::system_clock::to_time_t(m_timestart);
120  const std::time_t t_c = std::chrono::system_clock::to_time_t(now);
121 
122  auto t_dur = std::chrono::duration_cast<std::chrono::milliseconds>(now - m_timestart).count();
123 
124  LOGCEPH("XrdCephOssBufferedFile::Summary: {\"fd\":" << m_fd << ", \"Elapsed_time_ms\":" << t_dur
125  << ", \"path\":\"" << m_path
126  << "\", read_B:" << m_bytesRead.load()
127  << ", readV_B:" << m_bytesReadV.load()
128  << ", readAIO_B:" << m_bytesReadAIO.load()
129  << ", writeB:" << m_bytesWrite.load()
130  << ", writeAIO_B:" << m_bytesWriteAIO.load()
131  << ", startTime:\"" << std::put_time(std::localtime(&t_s), "%F %T") << "\", endTime:\""
132  << std::put_time(std::localtime(&t_c), "%F %T") << "\""
133  << ", nBuffersRead:" << m_bufferReadAlgs.size()
134  << "}");
135 
136  return m_xrdOssDF->Close(retsz);
137 }
138 
139 
140 ssize_t XrdCephOssBufferedFile::ReadV(XrdOucIOVec *readV, int rnum) {
141  // don't touch readV in the buffering method
142  ssize_t rc = m_xrdOssDF->ReadV(readV,rnum);
143  if (rc > 0) m_bytesReadV.fetch_add(rc);
144  return rc;
145 }
146 
147 ssize_t XrdCephOssBufferedFile::Read(off_t offset, size_t blen) {
148  return m_xrdOssDF->Read(offset, blen);
149 }
150 
151 ssize_t XrdCephOssBufferedFile::Read(void *buff, off_t offset, size_t blen) {
152  size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());
153 
154  IXrdCephBufferAlg * buffer{nullptr};
155  // check for, and create if needed, a buffer
156  {
157  // lock in case need to create a new algorithm instance
158  const std::lock_guard<std::mutex> lock(m_buf_mutex);
159  auto buffer_itr = m_bufferReadAlgs.find(thread_id);
160  if (buffer_itr == m_bufferReadAlgs.end()) {
161  // only create a buffer, if we haven't hit the max buffers yet
162  auto buffer_ptr = createBuffer();
163  if (buffer_ptr) {
164  buffer = buffer_ptr.get();
165  m_bufferReadAlgs[thread_id] = std::move(buffer_ptr);
166  } else {
167  // if we can't create a buffer, we just have to pass through the read ...
168  ssize_t rc = m_xrdOssDF->Read(buff, offset, blen);
169  if (rc >= 0) {
170  LOGCEPH( "XrdCephOssBufferedFile::Read buffers and read failed with rc: " << rc );
171  }
172  return rc;
173  }
174  } else {
175  buffer = buffer_itr->second.get();
176  }
177  } // scope of lock
178 
179  int retry_counter{m_maxBufferRetries};
180  ssize_t rc {0};
181  while (retry_counter > 0) {
182  rc = buffer->read(buff, offset, blen);
183  if (rc != -EBUSY) break; // either worked, or is a real non busy error
184  LOGCEPH( "XrdCephOssBufferedFile::Read Recieved EBUSY for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Sleeping .. "
185  << " rc:" << rc << " off:" << offset << " len:" << blen);
186  std::this_thread::sleep_for(m_maxBufferRetrySleepTime_ms * 1ms);
187  --retry_counter;
188  }
189  if (retry_counter == 0) {
190  // reach maximum attempts for ebusy retry; fail the job
191  LOGCEPH( "XrdCephOssBufferedFile::Read Max attempts for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Terminating with -EIO: "
192  << " rc:" << rc << " off:" << offset << " len:" << blen );
193  // set a permanent error code:
194  rc = -EIO;
195  }
196  if (rc >=0) {
197  m_bytesRead.fetch_add(rc);
198  } else {
199  LOGCEPH( "XrdCephOssBufferedFile::Read: Read error fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen);
200  }
201  // LOGCEPH( "XrdCephOssBufferedFile::Read: Read good fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen);
202  return rc;
203 }
204 
206  size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());
207  IXrdCephBufferAlg * buffer{nullptr};
208  // check for, and create if needed, a buffer
209  {
210  // lock in case need to create a new algorithm instance
211  const std::lock_guard<std::mutex> lock(m_buf_mutex);
212  auto buffer_itr = m_bufferReadAlgs.find(thread_id);
213  if (buffer_itr == m_bufferReadAlgs.end()) {
214  m_bufferReadAlgs[thread_id] = createBuffer();
215  buffer = m_bufferReadAlgs.find(thread_id)->second.get();
216  } else {
217  buffer = buffer_itr->second.get();
218  }
219  }
220 
221  // LOGCEPH("XrdCephOssBufferedFile::AIOREAD: fd: " << m_xrdOssDF->getFileDescriptor() << " " << time(nullptr) << " : "
222  // << aiop->sfsAio.aio_offset << " "
223  // << aiop->sfsAio.aio_nbytes << " " << aiop->sfsAio.aio_reqprio << " "
224  // << aiop->sfsAio.aio_fildes );
225  ssize_t rc = buffer->read_aio(aiop);
226  if (rc > 0) {
227  m_bytesReadAIO.fetch_add(rc);
228  } else {
229  LOGCEPH( "XrdCephOssBufferedFile::Read: ReadAIO error fd: " << m_fd << " rc:" << rc
230  << " off:" << aiop->sfsAio.aio_offset << " len:" << aiop->sfsAio.aio_nbytes );
231  }
232  return rc;
233 }
234 
235 ssize_t XrdCephOssBufferedFile::ReadRaw(void *buff, off_t offset, size_t blen) {
236  // #TODO; ReadRaw should bypass the buffer ?
237  return m_xrdOssDF->ReadRaw(buff, offset, blen);
238 }
239 
241  return m_xrdOssDF->Fstat(buff);
242 }
243 
244 ssize_t XrdCephOssBufferedFile::Write(const void *buff, off_t offset, size_t blen) {
245 
246  if (!m_bufferAlg) {
248  if (!m_bufferAlg) {
249  LOGCEPH( "XrdCephOssBufferedFile: Error in creating buffered object");
250  return -EINVAL;
251  }
252  }
253 
254 
255  int retry_counter{m_maxBufferRetries};
256  ssize_t rc {0};
257  while (retry_counter > 0) {
258  rc = m_bufferAlg->write(buff, offset, blen);
259  if (rc != -EBUSY) break; // either worked, or is a real non busy error
260  LOGCEPH( "XrdCephOssBufferedFile::Write Recieved EBUSY for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Sleeping .. "
261  << " rc:" << rc << " off:" << offset << " len:" << blen);
262  std::this_thread::sleep_for(m_maxBufferRetrySleepTime_ms * 1ms);
263  --retry_counter;
264  }
265  if (retry_counter == 0) {
266  // reach maximum attempts for ebusy retry; fail the job
267  LOGCEPH( "XrdCephOssBufferedFile::Write Max attempts for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Terminating with -EIO: "
268  << " rc:" << rc << " off:" << offset << " len:" << blen );
269  // set a permanent error code:
270  rc = -EIO;
271  }
272  if (rc >=0) {
273  m_bytesWrite.fetch_add(rc);
274  } else {
275  LOGCEPH( "XrdCephOssBufferedFile::Write: Write error fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen);
276  }
277  return rc;
278 }
279 
281  if (!m_bufferAlg) {
283  if (!m_bufferAlg) {
284  LOGCEPH( "XrdCephOssBufferedFile: Error in creating buffered object");
285  return -EINVAL;
286  }
287  }
288 
289  // LOGCEPH("XrdCephOssBufferedFile::AIOWRITE: fd: " << m_xrdOssDF->getFileDescriptor() << " " << time(nullptr) << " : "
290  // << aiop->sfsAio.aio_offset << " "
291  // << aiop->sfsAio.aio_nbytes << " " << aiop->sfsAio.aio_reqprio << " "
292  // << aiop->sfsAio.aio_fildes << " " );
293  ssize_t rc = m_bufferAlg->write_aio(aiop);
294  if (rc > 0) {
295  m_bytesWriteAIO.fetch_add(rc);
296  } else {
297  LOGCEPH( "XrdCephOssBufferedFile::Write: WriteAIO error fd: " << m_fd << " rc:" << rc
298  << " off:" << aiop->sfsAio.aio_offset << " len:" << aiop->sfsAio.aio_nbytes );
299  }
300  return rc;
301 
302 }
303 
305  return m_xrdOssDF->Fsync();
306 }
307 
308 int XrdCephOssBufferedFile::Ftruncate(unsigned long long len) {
309  return m_xrdOssDF->Ftruncate(len);
310 }
311 
312 
313 std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> XrdCephOssBufferedFile::createBuffer() {
314  std::unique_ptr<IXrdCephBufferAlg> bufferAlg;
315 
316  size_t bufferSize {m_bufsize}; // create buffer of default size
317  if (m_bufferReadAlgs.size() >= m_maxCountReadBuffers) {
318  BUFLOG("XrdCephOssBufferedFile: buffer reached max number of simul-buffers for this file: creating only 1MiB buffer" );
319  bufferSize = 1048576;
320  } else {
321  BUFLOG("XrdCephOssBufferedFile: buffer: got " << m_bufferReadAlgs.size() << " buffers already");
322  }
323 
324  try {
325  std::unique_ptr<IXrdCephBufferData> cephbuffer = std::unique_ptr<IXrdCephBufferData>(new XrdCephBufferDataSimple(bufferSize));
326  std::unique_ptr<ICephIOAdapter> cephio;
327  if (m_bufferIOmode == "aio") {
328  cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterAIORaw(cephbuffer.get(),m_fd));
329  } else if (m_bufferIOmode == "io") {
330  cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterRaw(cephbuffer.get(),m_fd,
332  } else {
333  BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io " );
334  m_xrdOssDF->Close();
335  return bufferAlg; // invalid instance;
336  }
337 
338  LOGCEPH( "XrdCephOssBufferedFile::Open: fd: " << m_fd << " Buffer created: " << cephbuffer->capacity() );
339  bufferAlg = std::unique_ptr<IXrdCephBufferAlg>(new XrdCephBufferAlgSimple(std::move(cephbuffer),std::move(cephio),m_fd) );
340  } catch (const std::bad_alloc &e) {
341  BUFLOG("XrdCephOssBufferedFile: Bad memory allocation in buffer: " << e.what() );
342  }
343 
344  return bufferAlg;
345  }
#define BUFLOG(x)
Definition: BufferUtils.hh:23
XrdSysError XrdCephEroute
XrdOucTrace XrdCephTrace
#define LOGCEPH(x)
Definition: XrdCephPosix.hh:51
int stat(const char *path, struct stat *buf)
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
Implements a non-async read and write to ceph via aio ceph_posix calls Using the standard ceph_posix_...
Implements a non-async read and write to ceph via ceph_posix calls Using the standard ceph_posix_ cal...
Interface to a holder of the main logic decisions of the buffering algortithm, decoupled from the buf...
Implementation of a buffer using a simple vector<char> Simplest implementation of a buffer using vect...
std::map< size_t, std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > > m_bufferReadAlgs
std::atomic< size_t > m_bytesRead
virtual int Ftruncate(unsigned long long)
std::chrono::time_point< std::chrono::system_clock > m_timestart
int m_maxBufferRetrySleepTime_ms
How many times to retry a ready from a buffer with EBUSY errors.
int m_maxBufferRetries
set the maximum of buffers to open on a single instance (e.g. for simultaneous file reads)
virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env)
virtual int Fstat(struct stat *buff)
std::atomic< size_t > m_bytesReadV
number of bytes read or written
std::atomic< size_t > m_bytesWrite
number of bytes read or written
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > m_bufferAlg
XrdCephOss * m_cephoss
create a new instance of the buffer
virtual ssize_t Write(const void *buff, off_t offset, size_t blen)
std::atomic< size_t > m_bytesReadAIO
number of bytes read or written
std::atomic< size_t > m_bytesWriteAIO
number of bytes read or written
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Read(off_t offset, size_t blen)
virtual int Close(long long *retsz=0)
int m_flags
number of ms to sleep if a retry is requested
size_t m_maxCountReadBuffers
any data access method on the buffer will use this
virtual ssize_t ReadRaw(void *, off_t, size_t)
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > createBuffer()
XrdCephOssBufferedFile(XrdCephOss *cephoss, XrdCephOssFile *cephossDF, size_t buffersize, const std::string &bufferIOmode, size_t maxNumberSimulBuffers)
virtual ssize_t Read(off_t offset, size_t blen)
virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env)
virtual int Fstat(struct stat *buff)
virtual int Ftruncate(unsigned long long)
virtual int Fsync(void)
virtual ssize_t ReadRaw(void *, off_t, size_t)
virtual ssize_t ReadV(XrdOucIOVec *readV, int n)
virtual int Close(long long *retsz=0)
virtual int getFileDescriptor() const
int m_useDefaultPreadAlg
Definition: XrdCephOss.hh:78
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
is a simple implementation of IXrdCephBufferData using std::vector<char> representation for the buffe...
Definition: BufferUtils.hh:29