XRootD
XrdCephBulkAioRead.cc
Go to the documentation of this file.
1 #include "XrdCephBulkAioRead.hh"
2 
3 
4 bulkAioRead::bulkAioRead(librados::IoCtx* ct, logfunc_pointer logwrapper, CephFileRef* fileref) {
13  context = ct;
14  file_ref = fileref;
15  log_func = logwrapper;
16 }
17 
22  clear();
23 }
24 
29  operations.clear();
30  buffers.clear();
31 }
32 
33 int bulkAioRead::addRequest(size_t obj_idx, char* out_buf, size_t size, off64_t offset) {
51  try{
52  auto &op_data = operations[obj_idx];
53  //When we start using C++17, the next two lines can be merged
54  buffers.emplace_back(out_buf);
55  auto &buf = buffers.back();
56  op_data.ceph_read_op.read(offset, size, &buf.bl, &buf.rc);
57  } catch (std::bad_alloc&) {
58  log_func((char*)"Memory allocation failed while reading file %s", file_ref->name.c_str());
59  return -ENOMEM;
60  }
61  return 0;
62 }
63 
74  for (auto &op_data: operations) {
75  size_t obj_idx = op_data.first;
76  //16 bytes for object hex number, 1 for dot and 1 for null-terminator
77  char object_suffix[18];
78  int sp_bytes_written;
79  sp_bytes_written = snprintf(object_suffix, sizeof(object_suffix), ".%016zx", obj_idx);
80  if (sp_bytes_written >= (int) sizeof(object_suffix)) {
81  log_func((char*)"Can not fit object suffix into buffer for file %s -- too big\n", file_ref->name.c_str());
82  return -EFBIG;
83  }
84 
85  std::string obj_name;
86  try {
87  obj_name = file_ref->name + std::string(object_suffix);
88  } catch (std::bad_alloc&) {
89  log_func((char*)"Can not create object string for file %s)", file_ref->name.c_str());
90  return -ENOMEM;
91  }
92  context->aio_operate(obj_name, op_data.second.cmpl.use(), &op_data.second.ceph_read_op, 0);
93  }
94 
95  for (auto &op_data: operations) {
96  op_data.second.cmpl.wait_for_complete();
97  int rval = op_data.second.cmpl.get_return_value();
98  /*
99  * Optimization is possible here: cancel all remaining read operations after the failure.
100  * One way to do so is the following: add context as an argument to the `use` method of CmplPtr.
101  * Then inside the class this pointer can be saved and used by the destructor to call
102  * `aio_cancel` (and probably `wait_for_complete`) before releasing the completion.
103  * Though one need to clarify whether it is necessary to cal `wait_for_complete` after
104  * `aio_cancel` (i.e. may the status variable/bufferlist still be written to or not).
105  */
106  if (rval < 0) {
107  log_func((char*)"Read of the object %ld for file %s failed", op_data.first, file_ref->name.c_str());
108  return rval;
109  }
110  }
111  return 0;
112 }
113 
126  ssize_t res = 0;
127  for (ReadOpData &op_data: buffers) {
128  if (op_data.rc < 0) {
129  //Is it possible to get here?
130  log_func((char*)"One of the reads failed with rc %d", op_data.rc);
131  return op_data.rc;
132  }
133  op_data.bl.begin().copy(op_data.bl.length(), op_data.out_buf);
134  res += op_data.bl.length();
135  }
136  //We should clear used completions to allow new operations
137  clear();
138  return res;
139 }
140 
141 int bulkAioRead::read(void* out_buf, size_t req_size, off64_t offset) {
158  if (req_size == 0) {
159  log_func((char*)"Zero-length read request for file %s, probably client error", file_ref->name.c_str());
160  return 0;
161  }
162 
163  char* const buf_start_ptr = (char*) out_buf;
164 
165  size_t object_size = file_ref->objectSize;
166  //The amount of bytes that is yet to be read
167  size_t to_read = req_size;
168  //block means ceph object here
169  size_t start_block = offset / object_size;
170  size_t buf_pos = 0;
171  size_t chunk_start = offset % object_size;
172 
173  while (to_read > 0) {
174  size_t chunk_len = std::min(to_read, object_size - chunk_start);
175 
176  if (buf_pos >= req_size) {
177  log_func((char*)"Internal bug! Attempt to read %lu data for block (%lu, %lu) of file %s\n", buf_pos, offset, req_size, file_ref->name.c_str());
178  return -EINVAL;
179  }
180 
181  int rc = addRequest(start_block, buf_start_ptr + buf_pos, chunk_len, chunk_start);
182  if (rc < 0) {
183  log_func((char*)"Unable to submit async read request, rc=%d\n", rc);
184  return rc;
185  }
186 
187  buf_pos += chunk_len;
188 
189  start_block++;
190  chunk_start = 0;
191  if (chunk_len > to_read) {
192  log_func((char*)"Internal bug! Read %lu bytes, more than expected %lu bytes for block (%lu, %lu) of file %s\n", chunk_len, to_read, offset, req_size, file_ref->name.c_str());
193  return -EINVAL;
194  }
195  to_read = to_read - chunk_len;
196  }
197  return 0;
198 }
static void logwrapper(char *format, va_list argp)
Definition: XrdCephOss.cc:62
ssize_t get_results()
int read(void *out_buf, size_t size, off64_t offset)
bulkAioRead(librados::IoCtx *ct, logfunc_pointer ptr, CephFileRef *fileref)
int submit_and_wait_for_complete()
std::string name
unsigned long long objectSize