XRootD
XrdTpcMultistream.cc
Go to the documentation of this file.
1 
5 #ifdef XRD_CHUNK_RESP
6 
7 #include "XrdTpcTPC.hh"
8 #include "XrdTpcState.hh"
9 #include "XrdTpcCurlMulti.hh"
10 
11 #include "XrdSys/XrdSysError.hh"
12 
13 #include <curl/curl.h>
14 
15 #include <algorithm>
16 #include <sstream>
17 #include <stdexcept>
18 
19 
20 using namespace TPC;
21 
22 class CurlHandlerSetupError : public std::runtime_error {
23 public:
24  CurlHandlerSetupError(const std::string &msg) :
25  std::runtime_error(msg)
26  {}
27 
28  virtual ~CurlHandlerSetupError() throw () {}
29 };
30 
31 namespace {
32 class MultiCurlHandler {
33 public:
34  MultiCurlHandler(std::vector<State*> &states, XrdSysError &log) :
35  m_handle(curl_multi_init()),
36  m_states(states),
37  m_log(log),
38  m_bytes_transferred(0),
39  m_error_code(0),
40  m_status_code(0)
41  {
42  if (m_handle == NULL) {
43  throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle");
44  }
45  m_avail_handles.reserve(states.size());
46  m_active_handles.reserve(states.size());
47  for (std::vector<State*>::const_iterator state_iter = states.begin();
48  state_iter != states.end();
49  state_iter++) {
50  m_avail_handles.push_back((*state_iter)->GetHandle());
51  }
52  }
53 
54  ~MultiCurlHandler()
55  {
56  if (!m_handle) {return;}
57  for (std::vector<CURL *>::const_iterator it = m_active_handles.begin();
58  it != m_active_handles.end();
59  it++) {
60  curl_multi_remove_handle(m_handle, *it);
61  }
62  curl_multi_cleanup(m_handle);
63  }
64 
65  MultiCurlHandler(const MultiCurlHandler &) = delete;
66 
67  CURLM *Get() const {return m_handle;}
68 
69  void FinishCurlXfer(CURL *curl) {
70  CURLMcode mres = curl_multi_remove_handle(m_handle, curl);
71  if (mres) {
72  std::stringstream ss;
73  ss << "Failed to remove transfer from set: "
74  << curl_multi_strerror(mres);
75  throw std::runtime_error(ss.str());
76  }
77  for (std::vector<State*>::iterator state_iter = m_states.begin();
78  state_iter != m_states.end();
79  state_iter++) {
80  if (curl == (*state_iter)->GetHandle()) {
81  m_bytes_transferred += (*state_iter)->BytesTransferred();
82  int error_code = (*state_iter)->GetErrorCode();
83  if (error_code && !m_error_code) {
84  m_error_code = error_code;
85  m_error_message = (*state_iter)->GetErrorMessage();
86  }
87  int status_code = (*state_iter)->GetStatusCode();
88  if (status_code >= 400 && !m_status_code) {
89  m_status_code = status_code;
90  m_error_message = (*state_iter)->GetErrorMessage();
91  }
92  (*state_iter)->ResetAfterRequest();
93  break;
94  }
95  }
96  for (std::vector<CURL *>::iterator iter = m_active_handles.begin();
97  iter != m_active_handles.end();
98  ++iter)
99  {
100  if (*iter == curl) {
101  m_active_handles.erase(iter);
102  break;
103  }
104  }
105  m_avail_handles.push_back(curl);
106  }
107 
108  off_t StartTransfers(off_t current_offset, off_t content_length, size_t block_size,
109  int &running_handles) {
110  bool started_new_xfer = false;
111  do {
112  size_t xfer_size = std::min(content_length - current_offset, static_cast<off_t>(block_size));
113  if (xfer_size == 0) {return current_offset;}
114  if (!(started_new_xfer = StartTransfer(current_offset, xfer_size))) {
115  // In this case, we need to start new transfers but weren't able to.
116  if (running_handles == 0) {
117  if (!CanStartTransfer(true)) {
118  m_log.Emsg("StartTransfers", "Unable to start transfers.");
119  }
120  }
121  break;
122  } else {
123  running_handles += 1;
124  }
125  current_offset += xfer_size;
126  } while (true);
127  return current_offset;
128  }
129 
130  int Flush() {
131  int last_error = 0;
132  for (std::vector<State*>::iterator state_it = m_states.begin();
133  state_it != m_states.end();
134  state_it++)
135  {
136  int error = (*state_it)->Flush();
137  if (error) {last_error = error;}
138  }
139  return last_error;
140  }
141 
142  off_t BytesTransferred() const {
143  return m_bytes_transferred;
144  }
145 
146  int GetStatusCode() const {
147  return m_status_code;
148  }
149 
150  int GetErrorCode() const {
151  return m_error_code;
152  }
153 
154  void SetErrorCode(int error_code) {
155  m_error_code = error_code;
156  }
157 
158  std::string GetErrorMessage() const {
159  return m_error_message;
160  }
161 
162  void SetErrorMessage(const std::string &error_msg) {
163  m_error_message = error_msg;
164  }
165 
166 private:
167 
168  bool StartTransfer(off_t offset, size_t size) {
169  if (!CanStartTransfer(false)) {return false;}
170  for (std::vector<CURL*>::const_iterator handle_it = m_avail_handles.begin();
171  handle_it != m_avail_handles.end();
172  handle_it++) {
173  for (std::vector<State*>::iterator state_it = m_states.begin();
174  state_it != m_states.end();
175  state_it++) {
176  if ((*state_it)->GetHandle() == *handle_it) { // This state object represents an idle handle.
177  (*state_it)->SetTransferParameters(offset, size);
178  ActivateHandle(**state_it);
179  return true;
180  }
181  }
182  }
183  return false;
184  }
185 
186  void ActivateHandle(State &state) {
187  CURL *curl = state.GetHandle();
188  m_active_handles.push_back(curl);
189  CURLMcode mres;
190  mres = curl_multi_add_handle(m_handle, curl);
191  if (mres) {
192  std::stringstream ss;
193  ss << "Failed to add transfer to libcurl multi-handle"
194  << curl_multi_strerror(mres);
195  throw std::runtime_error(ss.str());
196  }
197  for (auto iter = m_avail_handles.begin();
198  iter != m_avail_handles.end();
199  ++iter)
200  {
201  if (*iter == curl) {
202  m_avail_handles.erase(iter);
203  break;
204  }
205  }
206  }
207 
208  bool CanStartTransfer(bool log_reason) const {
209  size_t idle_handles = m_avail_handles.size();
210  size_t transfer_in_progress = 0;
211  for (std::vector<State*>::const_iterator state_iter = m_states.begin();
212  state_iter != m_states.end();
213  state_iter++) {
214  for (std::vector<CURL*>::const_iterator handle_iter = m_active_handles.begin();
215  handle_iter != m_active_handles.end();
216  handle_iter++) {
217  if (*handle_iter == (*state_iter)->GetHandle()) {
218  transfer_in_progress += (*state_iter)->BodyTransferInProgress();
219  break;
220  }
221  }
222  }
223  if (!idle_handles) {
224  if (log_reason) {
225  m_log.Emsg("CanStartTransfer", "Unable to start transfers as no idle CURL handles are available.");
226  }
227  return false;
228  }
229  ssize_t available_buffers = m_states[0]->AvailableBuffers();
230  // To be conservative, set aside buffers for any transfers that have been activated
231  // but don't have their first responses back yet.
232  available_buffers -= (m_active_handles.size() - transfer_in_progress);
233  if (log_reason && (available_buffers == 0)) {
234  std::stringstream ss;
235  ss << "Unable to start transfers as no buffers are available. Available buffers: " <<
236  m_states[0]->AvailableBuffers() << ", Active curl handles: " << m_active_handles.size()
237  << ", Transfers in progress: " << transfer_in_progress;
238  m_log.Emsg("CanStartTransfer", ss.str().c_str());
239  if (m_states[0]->AvailableBuffers() == 0) {
240  m_states[0]->DumpBuffers();
241  }
242  }
243  return available_buffers > 0;
244  }
245 
246  CURLM *m_handle;
247  std::vector<CURL *> m_avail_handles;
248  std::vector<CURL *> m_active_handles;
249  std::vector<State*> &m_states;
250  XrdSysError &m_log;
251  off_t m_bytes_transferred;
252  int m_error_code;
253  int m_status_code;
254  std::string m_error_message;
255 };
256 }
257 
258 
259 int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
260  size_t streams, std::vector<State*> &handles,
261  std::vector<ManagedCurlHandle> &curl_handles, TPCLogRecord &rec)
262 {
263  int result;
264  bool success;
265  CURL *curl = state.GetHandle();
266  if ((result = DetermineXferSize(curl, req, state, success, rec)) || !success) {
267  return result;
268  }
269  off_t content_size = state.GetContentLength();
270  off_t current_offset = 0;
271 
272  state.ResetAfterRequest();
273 
274  size_t concurrency = streams * m_pipelining_multiplier;
275 
276  handles.reserve(concurrency);
277  handles.push_back(new State());
278  handles[0]->Move(state);
279  for (size_t idx = 1; idx < concurrency; idx++) {
280  handles.push_back(handles[0]->Duplicate());
281  curl_handles.emplace_back(handles.back()->GetHandle());
282  }
283 
284  // Notify the packet marking manager that the transfer will start after this point
285  rec.pmarkManager.startTransfer();
286 
287  // Create the multi-handle and add in the current transfer to it.
288  MultiCurlHandler mch(handles, m_log);
289  CURLM *multi_handle = mch.Get();
290 
291 #ifdef USE_PIPELINING
292  curl_multi_setopt(multi_handle, CURLMOPT_PIPELINING, 1);
293  curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, streams);
294 #endif
295 
296  // Start response to client prior to the first call to curl_multi_perform
297  int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
298  if (retval) {
299  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
300  "Failed to send the initial response to the TPC client");
301  return retval;
302  } else {
303  logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
304  "Initial transfer response sent to the TPC client");
305  }
306 
307  // Start assigning transfers
308  int running_handles = 0;
309  current_offset = mch.StartTransfers(current_offset, content_size, m_block_size, running_handles);
310 
311  // Transfer loop: use curl to actually run the transfer, but periodically
312  // interrupt things to send back performance updates to the client.
313  time_t last_marker = 0;
314  // Track the time since the transfer last made progress
315  off_t last_advance_bytes = 0;
316  time_t last_advance_time = time(NULL);
317  time_t transfer_start = last_advance_time;
318  CURLcode res = static_cast<CURLcode>(-1);
319  CURLMcode mres = CURLM_OK;
320  do {
321  time_t now = time(NULL);
322  time_t next_marker = last_marker + m_marker_period;
323  if (now >= next_marker) {
324  if (current_offset > last_advance_bytes) {
325  last_advance_bytes = current_offset;
326  last_advance_time = now;
327  }
328  if (SendPerfMarker(req, rec, handles, current_offset)) {
329  logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
330  "Failed to send a perf marker to the TPC client");
331  return -1;
332  }
333  int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
334  if (now > last_advance_time + timeout) {
335  const char *log_prefix = rec.log_prefix.c_str();
336  bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
337 
338  mch.SetErrorCode(10);
339  std::stringstream ss;
340  ss << "Transfer failed because no bytes have been "
341  << (tpc_pull ? "received from the source (pull mode) in "
342  : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
343  mch.SetErrorMessage(ss.str());
344  break;
345  }
346  last_marker = now;
347  }
348 
349  mres = curl_multi_perform(multi_handle, &running_handles);
350  if (mres == CURLM_CALL_MULTI_PERFORM) {
351  // curl_multi_perform should be called again immediately. On newer
352  // versions of curl, this is no longer used.
353  continue;
354  } else if (mres != CURLM_OK) {
355  break;
356  }
357 
358  rec.pmarkManager.beginPMarks();
359 
360 
361  // Harvest any messages, looking for CURLMSG_DONE.
362  CURLMsg *msg;
363  do {
364  int msgq = 0;
365  msg = curl_multi_info_read(multi_handle, &msgq);
366  if (msg && (msg->msg == CURLMSG_DONE)) {
367  CURL *easy_handle = msg->easy_handle;
368  res = msg->data.result;
369  mch.FinishCurlXfer(easy_handle);
370  // If any requests fail, cut off the entire transfer.
371  if (res != CURLE_OK) {
372  break;
373  }
374  }
375  } while (msg);
376  if (res != static_cast<CURLcode>(-1) && res != CURLE_OK) {
377  std::stringstream ss;
378  ss << "Breaking loop due to failed curl transfer: " << curl_easy_strerror(res);
379  logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_CURL_FAILURE",
380  ss.str());
381  break;
382  }
383 
384  if (running_handles < static_cast<int>(concurrency)) {
385  // Issue new transfers if there is still pending work to do.
386  // Otherwise, continue running until there are no handles left.
387  if (current_offset != content_size) {
388  current_offset = mch.StartTransfers(current_offset, content_size,
389  m_block_size, running_handles);
390  if (!running_handles) {
391  std::stringstream ss;
392  ss << "No handles are able to run. Streams=" << streams << ", concurrency="
393  << concurrency;
394 
395  logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE", ss.str());
396  }
397  } else if (running_handles == 0) {
398  logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE",
399  "Unable to start new transfers; breaking loop.");
400  break;
401  }
402  }
403 
404  int64_t max_sleep_time = next_marker - time(NULL);
405  if (max_sleep_time <= 0) {
406  continue;
407  }
408  int fd_count;
409 #ifdef HAVE_CURL_MULTI_WAIT
410  mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000,
411  &fd_count);
412 #else
413  mres = curl_multi_wait_impl(multi_handle, max_sleep_time*1000,
414  &fd_count);
415 #endif
416  if (mres != CURLM_OK) {
417  break;
418  }
419  } while (running_handles);
420 
421  if (mres != CURLM_OK) {
422  std::stringstream ss;
423  ss << "Internal libcurl multi-handle error: "
424  << curl_multi_strerror(mres);
425  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", ss.str());
426  throw std::runtime_error(ss.str());
427  }
428 
429  // Harvest any messages, looking for CURLMSG_DONE.
430  CURLMsg *msg;
431  do {
432  int msgq = 0;
433  msg = curl_multi_info_read(multi_handle, &msgq);
434  if (msg && (msg->msg == CURLMSG_DONE)) {
435  CURL *easy_handle = msg->easy_handle;
436  mch.FinishCurlXfer(easy_handle);
437  if (res == CURLE_OK || res == static_cast<CURLcode>(-1))
438  res = msg->data.result; // Transfer result will be examined below.
439  }
440  } while (msg);
441 
442  if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
443  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
444  "Internal state error in libcurl");
445  throw std::runtime_error("Internal state error in libcurl");
446  }
447 
448  mch.Flush();
449 
450  rec.bytes_transferred = mch.BytesTransferred();
451  rec.tpc_status = mch.GetStatusCode();
452 
453  // Generate the final response back to the client.
454  std::stringstream ss;
455  success = false;
456  if (mch.GetStatusCode() >= 400) {
457  std::string err = mch.GetErrorMessage();
458  std::stringstream ss2;
459  ss2 << "Remote side failed with status code " << mch.GetStatusCode();
460  if (!err.empty()) {
461  std::replace(err.begin(), err.end(), '\n', ' ');
462  ss2 << "; error message: \"" << err << "\"";
463  }
464  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str());
465  ss << generateClientErr(ss2, rec);
466  } else if (mch.GetErrorCode()) {
467  std::string err = mch.GetErrorMessage();
468  if (err.empty()) {err = "(no error message provided)";}
469  else {std::replace(err.begin(), err.end(), '\n', ' ');}
470  std::stringstream ss2;
471  ss2 << "Error when interacting with local filesystem: " << err;
472  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
473  ss << generateClientErr(ss2, rec);
474  } else if (res != CURLE_OK) {
475  std::stringstream ss2;
476  ss2 << "Request failed when processing";
477  std::stringstream ss3;
478  ss3 << ss2.str() << ":" << curl_easy_strerror(res);
479  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss3.str());
480  ss << generateClientErr(ss2, rec, res);
481  } else if (current_offset != content_size) {
482  std::stringstream ss2;
483  ss2 << "Internal logic error led to early abort; current offset is " <<
484  current_offset << " while full size is " << content_size;
485  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
486  ss << generateClientErr(ss2, rec);
487  } else {
488  if (!handles[0]->Finalize()) {
489  std::stringstream ss2;
490  ss2 << "Failed to finalize and close file handle.";
491  ss << generateClientErr(ss2, rec);
492  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
493  ss2.str());
494  } else {
495  ss << "success: Created";
496  success = true;
497  }
498  }
499 
500  if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
501  logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
502  "Failed to send last update to remote client");
503  return retval;
504  } else if (success) {
505  logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
506  rec.status = 0;
507  }
508  return req.ChunkResp(NULL, 0);
509 }
510 
511 
512 int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
513  size_t streams, TPCLogRecord &rec)
514 {
515  std::vector<ManagedCurlHandle> curl_handles;
516  std::vector<State*> handles;
517  std::stringstream err_ss;
518  try {
519  int retval = RunCurlWithStreamsImpl(req, state, streams, handles, curl_handles, rec);
520  for (std::vector<State*>::iterator state_iter = handles.begin();
521  state_iter != handles.end();
522  state_iter++) {
523  delete *state_iter;
524  }
525  return retval;
526  } catch (CurlHandlerSetupError &e) {
527  for (std::vector<State*>::iterator state_iter = handles.begin();
528  state_iter != handles.end();
529  state_iter++) {
530  delete *state_iter;
531  }
532 
533  rec.status = 500;
534  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
535  std::stringstream ss;
536  ss << e.what();
537  err_ss << generateClientErr(ss, rec);
538  return req.SendSimpleResp(rec.status, NULL, NULL, e.what(), 0);
539  } catch (std::runtime_error &e) {
540  for (std::vector<State*>::iterator state_iter = handles.begin();
541  state_iter != handles.end();
542  state_iter++) {
543  delete *state_iter;
544  }
545 
546  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
547  std::stringstream ss;
548  ss << e.what();
549  err_ss << generateClientErr(ss, rec);
550  int retval;
551  if ((retval = req.ChunkResp(err_ss.str().c_str(), 0))) {
552  return retval;
553  }
554  return req.ChunkResp(NULL, 0);
555  }
556 }
557 
558 #endif // XRD_CHUNK_RESP
#define Duplicate(x, y)
@ Info
CURLMcode curl_multi_wait_impl(CURLM *multi_handle, int timeout_ms, int *numfds)
void CURL
Definition: XrdTpcState.hh:14
CURL * GetHandle() const
Definition: XrdTpcState.hh:104
void ResetAfterRequest()
Definition: XrdTpcState.cc:118
int GetErrorCode() const
Definition: XrdTpcState.hh:92
off_t GetContentLength() const
Definition: XrdTpcState.hh:90
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.