XRootD
XrdCephOssBufferedFile.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN)
3 // Author: Sebastien Ponce <sebastien.ponce@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef __XRD_CEPH_OSS_BUFFERED_FILE_HH__
26 #define __XRD_CEPH_OSS_BUFFERED_FILE_HH__
27 
28 #include "XrdOss/XrdOss.hh"
29 #include "XrdCeph/XrdCephOss.hh"
31 
35 
36 #include <memory>
37 #include <chrono>
38 #include <atomic>
39 #include <map>
40 #include <mutex>
41 
42 //------------------------------------------------------------------------------
46 //------------------------------------------------------------------------------
47 
48 class XrdCephOssBufferedFile : virtual public XrdCephOssFile { // XrdOssDF
49 
50 public:
51  XrdCephOssBufferedFile(XrdCephOss *cephoss,XrdCephOssFile *cephossDF, size_t buffersize,
52  const std::string& bufferIOmode,
53  size_t maxNumberSimulBuffers);
54  //explicit XrdCephOssBufferedFile(size_t buffersize);
55  virtual ~XrdCephOssBufferedFile();
56  virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env);
57  virtual int Close(long long *retsz=0);
58  virtual ssize_t Read(off_t offset, size_t blen);
59  virtual ssize_t Read(void *buff, off_t offset, size_t blen);
60  virtual int Read(XrdSfsAio *aoip);
61  virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt);
62  virtual ssize_t ReadRaw(void *, off_t, size_t);
63  virtual int Fstat(struct stat *buff);
64  virtual ssize_t Write(const void *buff, off_t offset, size_t blen);
65  virtual int Write(XrdSfsAio *aiop);
66  virtual int Fsync(void);
67  virtual int Ftruncate(unsigned long long);
68 
69 protected:
70  std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> createBuffer();
71 
72  XrdCephOss *m_cephoss = nullptr;
73  XrdCephOssFile * m_xrdOssDF = nullptr; // holder of the XrdCephOssFile instance
74  std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> m_bufferAlg;
75  std::map<size_t, std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> > m_bufferReadAlgs;
76  std::mutex m_buf_mutex;
77  size_t m_maxCountReadBuffers {10};
78 
79 
82 
83  int m_flags = 0;
84  size_t m_bufsize = 16*1024*1024L; // default 16MiB size
85  std::string m_bufferIOmode;
86  std::string m_path;
87  std::chrono::time_point<std::chrono::system_clock> m_timestart;
88  std::atomic<size_t> m_bytesRead = {0};
89  std::atomic<size_t> m_bytesReadV = {0};
90  std::atomic<size_t> m_bytesReadAIO = {0};
91  std::atomic<size_t> m_bytesWrite = {0};
92  std::atomic<size_t> m_bytesWriteAIO= {0};
93 };
94 
95 #endif /* __XRD_CEPH_OSS_BUFFERED_FILE_HH__ */
int stat(const char *path, struct stat *buf)
std::map< size_t, std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > > m_bufferReadAlgs
std::atomic< size_t > m_bytesRead
virtual int Ftruncate(unsigned long long)
std::chrono::time_point< std::chrono::system_clock > m_timestart
int m_maxBufferRetrySleepTime_ms
How many times to retry a ready from a buffer with EBUSY errors.
int m_maxBufferRetries
set the maximum of buffers to open on a single instance (e.g. for simultaneous file reads)
virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env)
virtual int Fstat(struct stat *buff)
std::atomic< size_t > m_bytesReadV
number of bytes read or written
std::atomic< size_t > m_bytesWrite
number of bytes read or written
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > m_bufferAlg
XrdCephOss * m_cephoss
create a new instance of the buffer
virtual ssize_t Write(const void *buff, off_t offset, size_t blen)
std::atomic< size_t > m_bytesReadAIO
number of bytes read or written
std::atomic< size_t > m_bytesWriteAIO
number of bytes read or written
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Read(off_t offset, size_t blen)
virtual int Close(long long *retsz=0)
int m_flags
number of ms to sleep if a retry is requested
size_t m_maxCountReadBuffers
any data access method on the buffer will use this
virtual ssize_t ReadRaw(void *, off_t, size_t)
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > createBuffer()
XrdCephOssBufferedFile(XrdCephOss *cephoss, XrdCephOssFile *cephossDF, size_t buffersize, const std::string &bufferIOmode, size_t maxNumberSimulBuffers)