XRootD
XrdCephOssReadVFile.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN)
3 // Author: Sebastien Ponce <sebastien.ponce@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include <sys/types.h>
26 #include <unistd.h>
27 #include <sstream>
28 #include <iostream>
29 #include <memory>
30 #include <algorithm>
31 #include <fcntl.h>
32 
33 #include "XrdCeph/XrdCephPosix.hh"
34 #include "XrdOuc/XrdOucEnv.hh"
35 #include "XrdSys/XrdSysError.hh"
36 #include "XrdOuc/XrdOucTrace.hh"
37 #include "XrdSfs/XrdSfsAio.hh"
39 
43 
44 using namespace XrdCephBuffer;
45 
48 
49 XrdCephOssReadVFile::XrdCephOssReadVFile(XrdCephOss *cephoss,XrdCephOssFile *cephossDF,const std::string& algname):
50 XrdCephOssFile(cephoss), m_cephoss(cephoss), m_xrdOssDF(cephossDF),m_algname(algname)
51 {
52  if (!m_xrdOssDF) XrdCephEroute.Say("XrdCephOssReadVFile::Null m_xrdOssDF");
53 
54  if (m_algname == "passthrough") { // #TODO consider to use a factory method. but this is simple enough for now
55  m_readVAdapter = std::unique_ptr<XrdCephBuffer::IXrdCephReadVAdapter>(new XrdCephBuffer::XrdCephReadVNoOp());
56  } else if (m_algname == "basic") {
57  m_readVAdapter = std::unique_ptr<XrdCephBuffer::IXrdCephReadVAdapter>(new XrdCephBuffer::XrdCephReadVBasic());
58  } else {
59  XrdCephEroute.Say("XrdCephOssReadVFile::ERROR Invalid ReadV algorthm passed; defaulting to passthrough");
60  m_algname = "passthrough";
61  m_readVAdapter = std::unique_ptr<XrdCephBuffer::IXrdCephReadVAdapter>(new XrdCephBuffer::XrdCephReadVNoOp());
62  }
63  LOGCEPH("XrdCephOssReadVFile Algorithm type: " << m_algname);
64 }
65 
67  if (m_xrdOssDF) {
68  delete m_xrdOssDF;
69  m_xrdOssDF = nullptr;
70  }
71 
72 }
73 
74 int XrdCephOssReadVFile::Open(const char *path, int flags, mode_t mode, XrdOucEnv &env) {
75  int rc = m_xrdOssDF->Open(path, flags, mode, env);
76  if (rc < 0) {
77  return rc;
78  }
80  LOGCEPH("XrdCephOssReadVFile::Open: fd: " << m_fd << " " << path );
81  return rc;
82 }
83 
84 int XrdCephOssReadVFile::Close(long long *retsz) {
85  LOGCEPH("XrdCephOssReadVFile::Close: retsz: " << retsz << " Time_ceph_s: " << m_timer_read_ns.load()*1e-9 << " count: "
86  << m_timer_count.load() << " size_B: " << m_timer_size.load()
87  << " longest_s:" << m_timer_longest.load()*1e-9);
88 
89  return m_xrdOssDF->Close(retsz);
90 }
91 
92 
93 ssize_t XrdCephOssReadVFile::ReadV(XrdOucIOVec *readV, int rnum) {
95  LOGCEPH("XrdCephOssReadVFile::ReadV: fd: " << fd << " " << rnum );
96 
97  std::stringstream msg_extents;
98  msg_extents << "XrdCephOssReadVFile::Extentslist={\"fd\": " << fd << ", \"EXTENTS\":[";
99 
100  ExtentHolder extents(rnum);
101  for (int i = 0; i < rnum; i++) {
102  extents.push_back(Extent(readV[i].offset, readV[i].size));
103  msg_extents << "[" << readV[i].offset << "," << readV[i].size << "]," ;
104  }
105  msg_extents << "]}";
106  //XrdCephEroute.Say(msg_extents.str().c_str()); msg_extents.clear();
107  if (m_extraLogging) {
108  // improve this so no wasted calls if logging is disabled
109  LOGCEPH(msg_extents.str());
110  msg_extents.clear();
111  }
112 
113  LOGCEPH("XrdCephOssReadVFile::Extents: fd: "<< fd << " " << extents.size() << " " << extents.len() << " "
114  << extents.begin() << " " << extents.end() << " " << extents.bytesContained()
115  << " " << extents.bytesMissing());
116 
117  // take the input set of extents and return a vector of merged extents (covering the range to read)
118  std::vector<ExtentHolder> mappedExtents = m_readVAdapter->convert(extents);
119 
120 
121  // counter is the iterator to the original readV elements, and is incremented for each chunk that's returned
122  int nbytes = 0, curCount = 0, counter(0);
123  size_t totalBytesRead(0), totalBytesUseful(0);
124 
125  // extract the largest range of the extents, and create a buffer.
126  size_t buffersize{0};
127  for (std::vector<ExtentHolder>::const_iterator ehit = mappedExtents.cbegin(); ehit!= mappedExtents.cend(); ++ehit ) {
128  buffersize = std::max(buffersize, ehit->len());
129  }
130  std::vector<char> buffer;
131  buffer.reserve(buffersize);
132 
133 
134  //LOGCEPH("mappedExtents: len: " << mappedExtents.size() );
135  for (std::vector<ExtentHolder>::const_iterator ehit = mappedExtents.cbegin(); ehit!= mappedExtents.cend(); ++ehit ) {
136  off_t off = ehit->begin();
137  size_t len = ehit->len();
138 
139  //LOGCEPH("outerloop: " << off << " " << len << " " << ehit->end() << " " << " " << ehit->size() );
140 
141  // read the full extent into the buffer
142  long timed_read_ns{0};
143  {Timer_ns ts(timed_read_ns);
144  curCount = m_xrdOssDF->Read(buffer.data(), off, len);
145  } // timer scope
146  ++m_timer_count;
147  auto l = m_timer_longest.load();
148  m_timer_longest.store(std::max(l,timed_read_ns)); // doesn't quite prevent race conditions
149  m_timer_read_ns.fetch_add(timed_read_ns);
150  m_timer_size.fetch_add(curCount);
151 
152  // check that the correct amount of data was read.
153  // std:: clog << "buf Read " << curCount << std::endl;
154  if (curCount != (ssize_t)len) {
155  return (curCount < 0 ? curCount : -ESPIPE);
156  }
157  totalBytesRead += curCount;
158  totalBytesUseful += ehit->bytesContained();
159 
160 
161  // now read out into the original readV requests for each of the held inner extents
162  const char* data = buffer.data();
163  const ExtentContainer& innerExtents = ehit->extents();
164  for (ExtentContainer::const_iterator it = innerExtents.cbegin(); it != innerExtents.cend(); ++it) {
165  off_t innerBegin = it->begin() - off;
166  off_t innerEnd = it->end() - off;
167  //LOGCEPH( "innerloop: " << innerBegin << " " << innerEnd << " " << off << " "
168  // << it->begin() << " " << it-> end() << " "
169  // << readV[counter].offset << " " << readV[counter].size);
170  std::copy(data+innerBegin, data+innerEnd, readV[counter].data );
171  nbytes += it->len();
172  ++counter; // next element
173  } // inner extents
174 
175  } // outer extents
176  LOGCEPH( "readV returning " << nbytes << " bytes: " << "Read: " <<totalBytesRead << " Useful: " << totalBytesUseful );
177  return nbytes;
178 
179 }
180 
181 ssize_t XrdCephOssReadVFile::Read(off_t offset, size_t blen) {
182  return m_xrdOssDF->Read(offset,blen);
183 }
184 
185 ssize_t XrdCephOssReadVFile::Read(void *buff, off_t offset, size_t blen) {
186  return m_xrdOssDF->Read(buff,offset,blen);
187 }
188 
190  return m_xrdOssDF->Read(aiop);
191 }
192 
193 ssize_t XrdCephOssReadVFile::ReadRaw(void *buff, off_t offset, size_t blen) {
194  return m_xrdOssDF->ReadRaw(buff, offset, blen);
195 }
196 
197 int XrdCephOssReadVFile::Fstat(struct stat *buff) {
198  return m_xrdOssDF->Fstat(buff);
199 }
200 
201 ssize_t XrdCephOssReadVFile::Write(const void *buff, off_t offset, size_t blen) {
202  return m_xrdOssDF->Write(buff,offset,blen);
203 }
204 
206  return m_xrdOssDF->Write(aiop);
207 }
208 
210  return m_xrdOssDF->Fsync();
211 }
212 
213 int XrdCephOssReadVFile::Ftruncate(unsigned long long len) {
214  return m_xrdOssDF->Ftruncate(len);
215 }
XrdSysError XrdCephEroute
XrdOucTrace XrdCephTrace
static std::string ts()
timestamp output for logging messages
Definition: XrdCephOss.cc:53
#define LOGCEPH(x)
Definition: XrdCephPosix.hh:51
int stat(const char *path, struct stat *buf)
Designed to hold individual extents, but itself provide Extent-like capabilities Useful in cases of c...
Definition: BufferUtils.hh:109
size_t bytesContained() const
Definition: BufferUtils.cc:124
size_t size() const
number of extent elements
Definition: BufferUtils.hh:122
void push_back(const Extent &in)
Definition: BufferUtils.cc:101
Combine requests into single reads accoriding to some basic rules. Read a minimum amount of data (2Mi...
Passthrough implementation. Convertes the ReadV requests to extents and makes the request....
virtual ssize_t Read(off_t offset, size_t blen)
virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env)
virtual ssize_t Write(const void *buff, off_t offset, size_t blen)
virtual int Fstat(struct stat *buff)
virtual int Ftruncate(unsigned long long)
virtual int Fsync(void)
virtual ssize_t ReadRaw(void *, off_t, size_t)
virtual int Close(long long *retsz=0)
virtual int getFileDescriptor() const
virtual int Fsync(void)
virtual int Close(long long *retsz=0)
std::atomic< long > m_timer_longest
number of reads
virtual ssize_t Read(off_t offset, size_t blen)
std::atomic< long > m_timer_size
number of reads
std::unique_ptr< XrdCephBuffer::IXrdCephReadVAdapter > m_readVAdapter
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
std::atomic< long > m_timer_read_ns
virtual ssize_t Write(const void *buff, off_t offset, size_t blen)
std::atomic< long > m_timer_count
timer for the reads against ceph
virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env)
XrdCephOssReadVFile(XrdCephOss *cephoss, XrdCephOssFile *cephossDF, const std::string &algname)
virtual int Ftruncate(unsigned long long)
virtual ssize_t ReadRaw(void *, off_t, size_t)
XrdCephOssFile * m_xrdOssDF
virtual int Fstat(struct stat *buff)
int fd
Definition: XrdOss.hh:455
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
is a simple implementation of IXrdCephBufferData using std::vector<char> representation for the buffe...
Definition: BufferUtils.hh:29
std::vector< Extent > ExtentContainer
Container defintion for Extents Typedef to provide a container of extents as a simple stl vector cont...
Definition: BufferUtils.hh:99
long long offset
Definition: XrdOucIOVec.hh:42