XRootD
XrdClXRootDMsgHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClLog.hh"
27 #include "XrdCl/XrdClDefaultEnv.hh"
28 #include "XrdCl/XrdClConstants.hh"
30 #include "XrdCl/XrdClMessage.hh"
31 #include "XrdCl/XrdClURL.hh"
32 #include "XrdCl/XrdClUtils.hh"
34 #include "XrdCl/XrdClJobManager.hh"
35 #include "XrdCl/XrdClSIDManager.hh"
39 #include "XrdCl/XrdClSocket.hh"
40 #include "XrdCl/XrdClTls.hh"
41 
42 #include "XrdOuc/XrdOucCRC.hh"
43 
44 #include "XrdSys/XrdSysPlatform.hh" // same as above
45 #include "XrdSys/XrdSysAtomics.hh"
46 #include "XrdSys/XrdSysPthread.hh"
47 #include <memory>
48 #include <sstream>
49 #include <numeric>
50 
51 namespace
52 {
53  //----------------------------------------------------------------------------
54  // We need an extra task what will run the handler in the future, because
55  // tasks get deleted and we need the handler
56  //----------------------------------------------------------------------------
57  class WaitTask: public XrdCl::Task
58  {
59  public:
60  WaitTask( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
61  {
62  std::ostringstream o;
63  o << "WaitTask for: 0x" << handler->GetRequest();
64  SetName( o.str() );
65  }
66 
67  virtual time_t Run( time_t now )
68  {
69  pHandler->WaitDone( now );
70  return 0;
71  }
72  private:
73  XrdCl::XRootDMsgHandler *pHandler;
74  };
75 }
76 
77 namespace XrdCl
78 {
79  //----------------------------------------------------------------------------
80  // Delegate the response handling to the thread-pool
81  //----------------------------------------------------------------------------
82  class HandleRspJob: public XrdCl::Job
83  {
84  public:
85  HandleRspJob( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
86  {
87 
88  }
89 
90  virtual ~HandleRspJob()
91  {
92 
93  }
94 
95  virtual void Run( void *arg )
96  {
97  pHandler->HandleResponse();
98  delete this;
99  }
100  private:
101  XrdCl::XRootDMsgHandler *pHandler;
102  };
103 
104  //----------------------------------------------------------------------------
105  // Examine an incoming message, and decide on the action to be taken
106  //----------------------------------------------------------------------------
107  uint16_t XRootDMsgHandler::Examine( std::shared_ptr<Message> &msg )
108  {
109  //--------------------------------------------------------------------------
110  // if the MsgHandler is already being used to process another request
111  // (kXR_oksofar) we need to wait
112  //--------------------------------------------------------------------------
113  if( pOksofarAsAnswer )
114  {
115  XrdSysCondVarHelper lck( pCV );
116  while( pResponse ) pCV.Wait();
117  }
118  else
119  {
120  if( pResponse )
121  {
122  Log *log = DefaultEnv::GetLog();
123  log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
124  "it already owns a response: 0x%x (message: %s ).",
125  pUrl.GetHostId().c_str(), this,
126  pRequest->GetDescription().c_str() );
127  }
128  }
129 
130  if( msg->GetSize() < 8 )
131  return Ignore;
132 
133  ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
134  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
135  uint16_t status = 0;
136  uint32_t dlen = 0;
137 
138  //--------------------------------------------------------------------------
139  // We only care about async responses, but those are extracted now
140  // in the SocketHandler.
141  //--------------------------------------------------------------------------
142  if( rsp->hdr.status == kXR_attn )
143  {
144  return Ignore;
145  }
146  //--------------------------------------------------------------------------
147  // We got a sync message - check if it belongs to us
148  //--------------------------------------------------------------------------
149  else
150  {
151  if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
152  rsp->hdr.streamid[1] != req->header.streamid[1] )
153  return Ignore;
154 
155  status = rsp->hdr.status;
156  dlen = rsp->hdr.dlen;
157  }
158 
159  //--------------------------------------------------------------------------
160  // We take the ownership of the message and decide what we will do
161  // with the handler itself, the options are:
162  // 1) we want to either read in raw mode (the Raw flag) or have the message
163  // body reconstructed for us by the TransportHandler by the time
164  // Process() is called (default, no extra flag)
165  // 2) we either got a full response in which case we don't want to be
166  // notified about anything anymore (RemoveHandler) or we got a partial
167  // answer and we need to wait for more (default, no extra flag)
168  //--------------------------------------------------------------------------
169  pResponse = msg;
170  pBodyReader->SetDataLength( dlen );
171 
172  Log *log = DefaultEnv::GetLog();
173  switch( status )
174  {
175  //------------------------------------------------------------------------
176  // Handle the cached cases
177  //------------------------------------------------------------------------
178  case kXR_error:
179  case kXR_redirect:
180  case kXR_wait:
181  return RemoveHandler;
182 
183  case kXR_waitresp:
184  {
185  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
186  "message %s", pUrl.GetHostId().c_str(),
187  pRequest->GetDescription().c_str() );
188 
189  pResponse.reset();
190  return Ignore; // This must be handled synchronously!
191  }
192 
193  //------------------------------------------------------------------------
194  // Handle the potential raw cases
195  //------------------------------------------------------------------------
196  case kXR_ok:
197  {
198  //----------------------------------------------------------------------
199  // For kXR_read we read in raw mode
200  //----------------------------------------------------------------------
201  uint16_t reqId = ntohs( req->header.requestid );
202  if( reqId == kXR_read )
203  {
204  return Raw | RemoveHandler;
205  }
206 
207  //----------------------------------------------------------------------
208  // kXR_readv is the same as kXR_read
209  //----------------------------------------------------------------------
210  if( reqId == kXR_readv )
211  {
212  return Raw | RemoveHandler;
213  }
214 
215  //----------------------------------------------------------------------
216  // For everything else we just take what we got
217  //----------------------------------------------------------------------
218  return RemoveHandler;
219  }
220 
221  //------------------------------------------------------------------------
222  // kXR_oksofars are special, they are not full responses, so we reset
223  // the response pointer to 0 and add the message to the partial list
224  //------------------------------------------------------------------------
225  case kXR_oksofar:
226  {
227  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
228  "%s", pUrl.GetHostId().c_str(),
229  pRequest->GetDescription().c_str() );
230 
231  if( !pOksofarAsAnswer )
232  {
233  pPartialResps.emplace_back( std::move( pResponse ) );
234  }
235 
236  //----------------------------------------------------------------------
237  // For kXR_read we either read in raw mode if the message has not
238  // been fully reconstructed already, if it has, we adjust
239  // the buffer offset to prepare for the next one
240  //----------------------------------------------------------------------
241  uint16_t reqId = ntohs( req->header.requestid );
242  if( reqId == kXR_read )
243  {
244  pTimeoutFence.store( true, std::memory_order_relaxed );
245  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
246  }
247 
248  //----------------------------------------------------------------------
249  // kXR_readv is similar to read, except that the payload is different
250  //----------------------------------------------------------------------
251  if( reqId == kXR_readv )
252  {
253  pTimeoutFence.store( true, std::memory_order_relaxed );
254  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
255  }
256 
257  return ( pOksofarAsAnswer ? None : NoProcess );
258  }
259 
260  case kXR_status:
261  {
262  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
263  "%s", pUrl.GetHostId().c_str(),
264  pRequest->GetDescription().c_str() );
265 
266  uint16_t reqId = ntohs( req->header.requestid );
267  if( reqId == kXR_pgwrite )
268  {
269  //--------------------------------------------------------------------
270  // In case of pgwrite by definition this wont be a partial response
271  // so we can already remove the handler from the in-queue
272  //--------------------------------------------------------------------
273  return RemoveHandler;
274  }
275 
276  //----------------------------------------------------------------------
277  // Otherwise (pgread), first of all we need to read the body of the
278  // kXR_status response, we can handle the raw data (if any) only after
279  // we have the whole kXR_status body
280  //----------------------------------------------------------------------
281  pTimeoutFence.store( true, std::memory_order_relaxed );
282  return None;
283  }
284 
285  //------------------------------------------------------------------------
286  // Default
287  //------------------------------------------------------------------------
288  default:
289  return RemoveHandler;
290  }
291  return RemoveHandler;
292  }
293 
294  //----------------------------------------------------------------------------
295  // Reexamine the incoming message, and decide on the action to be taken
296  //----------------------------------------------------------------------------
298  {
299  if( !pResponse )
300  return 0;
301 
302  Log *log = DefaultEnv::GetLog();
303  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
304 
305  //--------------------------------------------------------------------------
306  // Additional action is only required for kXR_status
307  //--------------------------------------------------------------------------
308  if( rsp->hdr.status != kXR_status ) return 0;
309 
310  //--------------------------------------------------------------------------
311  // Ignore malformed status response
312  //--------------------------------------------------------------------------
313  if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
314  {
315  log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
316  return Corrupted;
317  }
318 
319  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
320  uint16_t reqId = ntohs( req->header.requestid );
321  //--------------------------------------------------------------------------
322  // Unmarshal the status body
323  //--------------------------------------------------------------------------
324  XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
325 
326  if( !st.IsOK() && st.code == errDataError )
327  {
328  log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
329  st.GetErrorMessage().c_str() );
330  return Corrupted;
331  }
332 
333  if( !st.IsOK() )
334  {
335  log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
336  pUrl.GetHostId().c_str() );
337  pStatus = st;
338  HandleRspOrQueue();
339  return Ignore;
340  }
341 
342  //--------------------------------------------------------------------------
343  // Common handling for partial results
344  //--------------------------------------------------------------------------
345  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
347  {
348  pPartialResps.push_back( std::move( pResponse ) );
349  }
350 
351  //--------------------------------------------------------------------------
352  // Decide the actions that we need to take
353  //--------------------------------------------------------------------------
354  uint16_t action = 0;
355  if( reqId == kXR_pgread )
356  {
357  //----------------------------------------------------------------------
358  // The message contains only Status header and body but no raw data
359  //----------------------------------------------------------------------
360  if( !pPageReader )
361  pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
362  pPageReader->SetRsp( rspst );
363 
364  action |= Raw;
365 
367  action |= NoProcess;
368  else
369  action |= RemoveHandler;
370  }
371  else if( reqId == kXR_pgwrite )
372  {
373  // if data corruption has been detected on the server side we will
374  // send some additional data pointing to the pages that need to be
375  // retransmitted
376  if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
377  pResponse->GetCursor() )
378  action |= More;
379  }
380 
381  return action;
382  }
383 
384  //----------------------------------------------------------------------------
385  // Get handler sid
386  //----------------------------------------------------------------------------
387  uint16_t XRootDMsgHandler::GetSid() const
388  {
389  ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
390  return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
391  }
392 
393  //----------------------------------------------------------------------------
395  //----------------------------------------------------------------------------
397  {
398  Log *log = DefaultEnv::GetLog();
399 
400  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
401 
402  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
403 
404  //--------------------------------------------------------------------------
405  // If it is a local file, it can be only a metalink redirector
406  //--------------------------------------------------------------------------
407  if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
408  pHosts->back().protocol = kXR_PROTOCOLVERSION;
409 
410  //--------------------------------------------------------------------------
411  // We got an answer, check who we were talking to
412  //--------------------------------------------------------------------------
413  else
414  {
415  AnyObject qryResult;
416  int *qryResponse = 0;
417  pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
418  qryResult.Get( qryResponse );
419  pHosts->back().flags = *qryResponse; delete qryResponse; qryResponse = 0;
420  pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
421  qryResult.Get( qryResponse );
422  pHosts->back().protocol = *qryResponse; delete qryResponse;
423  }
424 
425  //--------------------------------------------------------------------------
426  // Process the message
427  //--------------------------------------------------------------------------
428  Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
429  if( !st.IsOK() )
430  {
431  pStatus = Status( stFatal, errInvalidMessage );
432  HandleResponse();
433  return;
434  }
435 
436  //--------------------------------------------------------------------------
437  // we have an response for the message so it's not in fly anymore
438  //--------------------------------------------------------------------------
439  pMsgInFly = false;
440 
441  //--------------------------------------------------------------------------
442  // Reset the aggregated wait (used to omit wait response in case of Metalink
443  // redirector)
444  //--------------------------------------------------------------------------
445  if( rsp->hdr.status != kXR_wait )
446  pAggregatedWaitTime = 0;
447 
448  switch( rsp->hdr.status )
449  {
450  //------------------------------------------------------------------------
451  // kXR_ok - we're done here
452  //------------------------------------------------------------------------
453  case kXR_ok:
454  {
455  log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
456  pUrl.GetHostId().c_str(),
457  pRequest->GetDescription().c_str() );
458  pStatus = Status();
459  HandleResponse();
460  return;
461  }
462 
463  case kXR_status:
464  {
465  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
466  pUrl.GetHostId().c_str(),
467  pRequest->GetDescription().c_str() );
468  pStatus = Status();
469  HandleResponse();
470  return;
471  }
472 
473  //------------------------------------------------------------------------
474  // kXR_ok - we're serving partial result to the user
475  //------------------------------------------------------------------------
476  case kXR_oksofar:
477  {
478  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
479  pUrl.GetHostId().c_str(),
480  pRequest->GetDescription().c_str() );
481  pStatus = Status( stOK, suContinue );
482  HandleResponse();
483  return;
484  }
485 
486  //------------------------------------------------------------------------
487  // kXR_error - we've got a problem
488  //------------------------------------------------------------------------
489  case kXR_error:
490  {
491  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
492  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
493  log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
494  "[%d] %s", pUrl.GetHostId().c_str(),
495  pRequest->GetDescription().c_str(), rsp->body.error.errnum,
496  errmsg );
497  delete [] errmsg;
498 
499  HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
500  return;
501  }
502 
503  //------------------------------------------------------------------------
504  // kXR_redirect - they tell us to go elsewhere
505  //------------------------------------------------------------------------
506  case kXR_redirect:
507  {
508  if( rsp->hdr.dlen <= 4 )
509  {
510  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
511  pUrl.GetHostId().c_str() );
512  pStatus = Status( stError, errInvalidResponse );
513  HandleResponse();
514  return;
515  }
516 
517  char *urlInfoBuff = new char[rsp->hdr.dlen-3];
518  urlInfoBuff[rsp->hdr.dlen-4] = 0;
519  memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
520  std::string urlInfo = urlInfoBuff;
521  delete [] urlInfoBuff;
522  log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
523  "message %s: %s, port %d", pUrl.GetHostId().c_str(),
524  pRequest->GetDescription().c_str(), urlInfo.c_str(),
525  rsp->body.redirect.port );
526 
527  //----------------------------------------------------------------------
528  // Check if we can proceed
529  //----------------------------------------------------------------------
530  if( !pRedirectCounter )
531  {
532  log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
533  "message %s, the last known error is: %s",
534  pUrl.GetHostId().c_str(),
535  pRequest->GetDescription().c_str(),
536  pLastError.ToString().c_str() );
537 
538 
539  pStatus = Status( stFatal, errRedirectLimit );
540  HandleResponse();
541  return;
542  }
543  --pRedirectCounter;
544 
545  //----------------------------------------------------------------------
546  // Keep the info about this server if we still need to find a load
547  // balancer
548  //----------------------------------------------------------------------
549  uint32_t flags = pHosts->back().flags;
550  if( !pHasLoadBalancer )
551  {
552  if( flags & kXR_isManager )
553  {
554  //------------------------------------------------------------------
555  // If the current server is a meta manager then it supersedes
556  // any existing load balancer, otherwise we assign a load-balancer
557  // only if it has not been already assigned
558  //------------------------------------------------------------------
559  if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
560  {
561  pLoadBalancer = pHosts->back();
562  log->Dump( XRootDMsg, "[%s] Current server has been assigned "
563  "as a load-balancer for message %s",
564  pUrl.GetHostId().c_str(),
565  pRequest->GetDescription().c_str() );
566  HostList::iterator it;
567  for( it = pHosts->begin(); it != pHosts->end(); ++it )
568  it->loadBalancer = false;
569  pHosts->back().loadBalancer = true;
570  }
571  }
572  }
573 
574  //----------------------------------------------------------------------
575  // If the redirect comes from a data server safe the URL because
576  // in case of a failure we will use it as the effective data server URL
577  // for the tried CGI opaque info
578  //----------------------------------------------------------------------
579  if( flags & kXR_isServer )
580  pEffectiveDataServerUrl = new URL( pHosts->back().url );
581 
582  //----------------------------------------------------------------------
583  // Build the URL and check it's validity
584  //----------------------------------------------------------------------
585  std::vector<std::string> urlComponents;
586  std::string newCgi;
587  Utils::splitString( urlComponents, urlInfo, "?" );
588 
589  std::ostringstream o;
590 
591  o << urlComponents[0];
592  if( rsp->body.redirect.port > 0 )
593  o << ":" << rsp->body.redirect.port << "/";
594  else if( rsp->body.redirect.port < 0 )
595  {
596  //--------------------------------------------------------------------
597  // check if the manager wants to enforce write recovery at himself
598  // (beware we are dealing here with negative flags)
599  //--------------------------------------------------------------------
600  if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
601  pHosts->back().flags |= kXR_recoverWrts;
602 
603  //--------------------------------------------------------------------
604  // check if the manager wants to collapse the communication channel
605  // (the redirect host is to replace the current host)
606  //--------------------------------------------------------------------
607  if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
608  {
609  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
610  pPostMaster->CollapseRedirect( pUrl, url );
611  }
612 
613  if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
614  {
615  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
616  if( Utils::CheckEC( pRequest, url ) )
617  pRedirectAsAnswer = true;
618  }
619  }
620 
621  URL newUrl = URL( o.str() );
622  if( !newUrl.IsValid() )
623  {
624  pStatus = Status( stError, errInvalidRedirectURL );
625  log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
626  pUrl.GetHostId().c_str(), urlInfo.c_str() );
627  HandleResponse();
628  return;
629  }
630 
631  if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
632  newUrl.SetUserName( pUrl.GetUserName() );
633 
634  if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
635  newUrl.SetPassword( pUrl.GetPassword() );
636 
637  //----------------------------------------------------------------------
638  // Forward any "xrd.*" params from the original client request also to
639  // the new redirection url
640  // Also, we need to preserve any "xrdcl.*' as they are important for
641  // our internal workflows.
642  //----------------------------------------------------------------------
643  std::ostringstream ossXrd;
644  const URL::ParamsMap &urlParams = pUrl.GetParams();
645 
646  for(URL::ParamsMap::const_iterator it = urlParams.begin();
647  it != urlParams.end(); ++it )
648  {
649  if( it->first.compare( 0, 4, "xrd." ) &&
650  it->first.compare( 0, 6, "xrdcl." ) )
651  continue;
652 
653  ossXrd << it->first << '=' << it->second << '&';
654  }
655 
656  std::string xrdCgi = ossXrd.str();
657  pRedirectUrl = newUrl.GetURL();
658 
659  URL cgiURL;
660  if( urlComponents.size() > 1 )
661  {
662  pRedirectUrl += "?";
663  pRedirectUrl += urlComponents[1];
664  std::ostringstream o;
665  o << "fake://fake:111//fake?";
666  o << urlComponents[1];
667 
668  if( urlComponents.size() == 3 )
669  o << '?' << urlComponents[2];
670 
671  if (!xrdCgi.empty())
672  {
673  o << '&' << xrdCgi;
674  pRedirectUrl += '&';
675  pRedirectUrl += xrdCgi;
676  }
677 
678  cgiURL = URL( o.str() );
679  }
680  else {
681  if (!xrdCgi.empty())
682  {
683  std::ostringstream o;
684  o << "fake://fake:111//fake?";
685  o << xrdCgi;
686  cgiURL = URL( o.str() );
687  pRedirectUrl += '?';
688  pRedirectUrl += xrdCgi;
689  }
690  }
691 
692  //----------------------------------------------------------------------
693  // Check if we need to return the URL as a response
694  //----------------------------------------------------------------------
695  if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
696  newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
697  !newUrl.IsLocalFile() )
698  pRedirectAsAnswer = true;
699 
700  if( pRedirectAsAnswer )
701  {
702  pStatus = Status( stError, errRedirect );
703  HandleResponse();
704  return;
705  }
706 
707  //----------------------------------------------------------------------
708  // Rewrite the message in a way required to send it to another server
709  //----------------------------------------------------------------------
710  newUrl.SetParams( cgiURL.GetParams() );
711  Status st = RewriteRequestRedirect( newUrl );
712  if( !st.IsOK() )
713  {
714  pStatus = st;
715  HandleResponse();
716  return;
717  }
718 
719  //----------------------------------------------------------------------
720  // Make sure we don't change the protocol by accident (root vs roots)
721  //----------------------------------------------------------------------
722  if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
723  ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
724  newUrl.SetProtocol( "roots" );
725 
726  //----------------------------------------------------------------------
727  // Send the request to the new location
728  //----------------------------------------------------------------------
729  HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
730  return;
731  }
732 
733  //------------------------------------------------------------------------
734  // kXR_wait - we wait, and re-issue the request later
735  //------------------------------------------------------------------------
736  case kXR_wait:
737  {
738  uint32_t waitSeconds = 0;
739 
740  if( rsp->hdr.dlen >= 4 )
741  {
742  char *infoMsg = new char[rsp->hdr.dlen-3];
743  infoMsg[rsp->hdr.dlen-4] = 0;
744  memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
745  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
746  "message %s: %s", pUrl.GetHostId().c_str(),
747  rsp->body.wait.seconds, pRequest->GetDescription().c_str(),
748  infoMsg );
749  delete [] infoMsg;
750  waitSeconds = rsp->body.wait.seconds;
751  }
752  else
753  {
754  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
755  "message %s", pUrl.GetHostId().c_str(),
756  pRequest->GetDescription().c_str() );
757  }
758 
759  pAggregatedWaitTime += waitSeconds;
760 
761  // We need a special case if the data node comes from metalink
762  // redirector. In this case it might make more sense to try the
763  // next entry in the Metalink than wait.
764  if( OmitWait( *pRequest, pLoadBalancer.url ) )
765  {
766  int maxWait = DefaultMaxMetalinkWait;
767  DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
768  if( pAggregatedWaitTime > maxWait )
769  {
770  UpdateTriedCGI();
771  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
772  return;
773  }
774  }
775 
776  //----------------------------------------------------------------------
777  // Some messages require rewriting before they can be sent again
778  // after wait
779  //----------------------------------------------------------------------
780  Status st = RewriteRequestWait();
781  if( !st.IsOK() )
782  {
783  pStatus = st;
784  HandleResponse();
785  return;
786  }
787 
788  //----------------------------------------------------------------------
789  // Register a task to resend the message in some seconds, if we still
790  // have time to do that, and report a timeout otherwise
791  //----------------------------------------------------------------------
792  time_t resendTime = ::time(0)+waitSeconds;
793 
794  if( resendTime < pExpiration )
795  {
796  log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: 0x%x (message: %s ).",
797  pUrl.GetHostId().c_str(), this,
798  pRequest->GetDescription().c_str() );
799 
800  TaskManager *taskMgr = pPostMaster->GetTaskManager();
801  taskMgr->RegisterTask( new WaitTask( this ), resendTime );
802  }
803  else
804  {
805  log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
806  pUrl.GetHostId().c_str(),
807  pRequest->GetDescription().c_str() );
808  HandleError( Status( stError, errOperationExpired) );
809  }
810  return;
811  }
812 
813  //------------------------------------------------------------------------
814  // kXR_waitresp - the response will be returned in some seconds as an
815  // unsolicited message. Currently all messages of this type are handled
816  // one step before in the XrdClStream::OnIncoming as they need to be
817  // processed synchronously.
818  //------------------------------------------------------------------------
819  case kXR_waitresp:
820  {
821  if( rsp->hdr.dlen < 4 )
822  {
823  log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
824  pUrl.GetHostId().c_str() );
825  pStatus = Status( stError, errInvalidResponse );
826  HandleResponse();
827  return;
828  }
829 
830  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
831  "message %s", pUrl.GetHostId().c_str(),
832  rsp->body.waitresp.seconds,
833  pRequest->GetDescription().c_str() );
834  return;
835  }
836 
837  //------------------------------------------------------------------------
838  // Default - unrecognized/unsupported response, declare an error
839  //------------------------------------------------------------------------
840  default:
841  {
842  log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
843  "message %s", pUrl.GetHostId().c_str(),
844  rsp->hdr.status, pRequest->GetDescription().c_str() );
845  pStatus = Status( stError, errInvalidResponse );
846  HandleResponse();
847  return;
848  }
849  }
850 
851  return;
852  }
853 
854  //----------------------------------------------------------------------------
855  // Handle an event other that a message arrival - may be timeout
856  //----------------------------------------------------------------------------
858  XRootDStatus status )
859  {
860  Log *log = DefaultEnv::GetLog();
861  log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
862  pUrl.GetHostId().c_str(), pRequest->GetDescription().c_str() );
863 
864  if( event == Ready )
865  return 0;
866 
867  if( pTimeoutFence.load( std::memory_order_relaxed ) )
868  return 0;
869 
870  HandleError( status );
871  return RemoveHandler;
872  }
873 
874  //----------------------------------------------------------------------------
875  // Read message body directly from a socket
876  //----------------------------------------------------------------------------
878  Socket *socket,
879  uint32_t &bytesRead )
880  {
881  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
882  uint16_t reqId = ntohs( req->header.requestid );
883 
884  if( reqId == kXR_pgread )
885  return pPageReader->Read( *socket, bytesRead );
886 
887  return pBodyReader->Read( *socket, bytesRead );
888  }
889 
890  //----------------------------------------------------------------------------
891  // We're here when we requested sending something over the wire
892  // and there has been a status update on this action
893  //----------------------------------------------------------------------------
895  XRootDStatus status )
896  {
897  Log *log = DefaultEnv::GetLog();
898 
899  //--------------------------------------------------------------------------
900  // We were successful, so we now need to listen for a response
901  //--------------------------------------------------------------------------
902  if( status.IsOK() )
903  {
904  log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
905  pUrl.GetHostId().c_str(), message->GetDescription().c_str() );
906 
907  log->Debug( ExDbgMsg, "[%s] Moving MsgHandler: 0x%x (message: %s ) from out-queue to in-queue.",
908  pUrl.GetHostId().c_str(), this,
909  pRequest->GetDescription().c_str() );
910 
911  pMsgInFly = true;
912  return;
913  }
914 
915  //--------------------------------------------------------------------------
916  // We have failed, recover if possible
917  //--------------------------------------------------------------------------
918  log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
919  "recover.", pUrl.GetHostId().c_str(),
920  message->GetDescription().c_str() );
921  HandleError( status );
922  }
923 
924  //----------------------------------------------------------------------------
925  // Are we a raw writer or not?
926  //----------------------------------------------------------------------------
928  {
929  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
930  uint16_t reqId = ntohs( req->header.requestid );
931  if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
932  return true;
933  // checkpoint + execute
934  if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
935  {
936  ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
937  reqId = ntohs( xeq->header.requestid );
938  return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
939  }
940 
941  return false;
942  }
943 
944  //----------------------------------------------------------------------------
945  // Write the message body
946  //----------------------------------------------------------------------------
948  uint32_t &bytesWritten )
949  {
950  //--------------------------------------------------------------------------
951  // First check if it is a PgWrite
952  //--------------------------------------------------------------------------
953  if( !pChunkList->empty() && !pCrc32cDigests.empty() )
954  {
955  //------------------------------------------------------------------------
956  // PgWrite will have just one chunk
957  //------------------------------------------------------------------------
958  ChunkInfo chunk = pChunkList->front();
959  //------------------------------------------------------------------------
960  // Calculate the size of the first and last page (in case the chunk is not
961  // 4KB aligned)
962  //------------------------------------------------------------------------
963  int fLen = 0, lLen = 0;
964  size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
965 
966  //------------------------------------------------------------------------
967  // Set the crc32c buffer if not ready yet
968  //------------------------------------------------------------------------
969  if( pPgWrtCksumBuff.GetCursor() == 0 )
970  {
971  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
972  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
973  }
974 
975  uint32_t btsLeft = chunk.length - pAsyncOffset;
976  uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
977  if( pglen > btsLeft ) pglen = btsLeft;
978  char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
979 
980  while( btsLeft > 0 )
981  {
982  // first write the crc32c digest
983  while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
984  {
985  uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
986  char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
987  int btswrt = 0;
988  Status st = socket->Send( dgstbuf, dgstlen, btswrt );
989  if( !st.IsOK() ) return st;
990  bytesWritten += btswrt;
991  pPgWrtCksumBuff.AdvanceCursor( btswrt );
992  if( st.code == suRetry ) return st;
993  }
994  // then write the raw data (one page)
995  int btswrt = 0;
996  Status st = socket->Send( pgbuf, pglen, btswrt );
997  if( !st.IsOK() ) return st;
998  pgbuf += btswrt;
999  pglen -= btswrt;
1000  btsLeft -= btswrt;
1001  bytesWritten += btswrt;
1002  pAsyncOffset += btswrt; // update the offset to the raw data
1003  if( st.code == suRetry ) return st;
1004  // if we managed to write all the data ...
1005  if( pglen == 0 )
1006  {
1007  // move to the next page
1008  ++pPgWrtCurrentPageNb;
1009  if( pPgWrtCurrentPageNb < nbpgs )
1010  {
1011  // set the digest buffer
1012  pPgWrtCksumBuff.SetCursor( 0 );
1013  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1014  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1015  }
1016  // set the page length
1017  pglen = XrdSys::PageSize;
1018  if( pglen > btsLeft ) pglen = btsLeft;
1019  // reset offset in the current page
1020  pPgWrtCurrentPageOffset = 0;
1021  }
1022  else
1023  // otherwise just adjust the offset in the current page
1024  pPgWrtCurrentPageOffset += btswrt;
1025 
1026  }
1027  }
1028  else if( !pChunkList->empty() )
1029  {
1030  size_t size = pChunkList->size();
1031  for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1032  {
1033  char *buffer = (char*)(*pChunkList)[i].buffer;
1034  uint32_t size = (*pChunkList)[i].length;
1035  size_t leftToBeWritten = size - pAsyncOffset;
1036 
1037  while( leftToBeWritten )
1038  {
1039  int btswrt = 0;
1040  Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1041  bytesWritten += btswrt;
1042  if( !st.IsOK() || st.code == suRetry ) return st;
1043  pAsyncOffset += btswrt;
1044  leftToBeWritten -= btswrt;
1045  }
1046  //----------------------------------------------------------------------
1047  // Remember that we have moved to the next chunk, also clear the offset
1048  // within the buffer as we are going to move to a new one
1049  //----------------------------------------------------------------------
1050  ++pAsyncChunkIndex;
1051  pAsyncOffset = 0;
1052  }
1053  }
1054  else
1055  {
1056  Log *log = DefaultEnv::GetLog();
1057 
1058  //------------------------------------------------------------------------
1059  // If the socket is encrypted we cannot use a kernel buffer, we have to
1060  // convert to user space buffer
1061  //------------------------------------------------------------------------
1062  if( socket->IsEncrypted() )
1063  {
1064  log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1065  pUrl.GetHostId().c_str() );
1066 
1067  char *ubuff = 0;
1068  ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1069  if( ret < 0 ) return Status( stError, errInternal );
1070  pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1071  return WriteMessageBody( socket, bytesWritten );
1072  }
1073 
1074  //------------------------------------------------------------------------
1075  // Send the data
1076  //------------------------------------------------------------------------
1077  while( !pKBuff->Empty() )
1078  {
1079  int btswrt = 0;
1080  Status st = socket->Send( *pKBuff, btswrt );
1081  bytesWritten += btswrt;
1082  if( !st.IsOK() || st.code == suRetry ) return st;
1083  }
1084 
1085  log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1086  pUrl.GetHostId().c_str(), pRequest->GetDescription().c_str() );
1087  }
1088 
1089  return Status();
1090  }
1091 
1092  //----------------------------------------------------------------------------
1093  // We're here when we got a time event. We needed to re-issue the request
1094  // in some time in the future, and that moment has arrived
1095  //----------------------------------------------------------------------------
1097  {
1098  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1099  }
1100 
1101  //----------------------------------------------------------------------------
1102  // Bookkeeping after partial response has been received.
1103  //----------------------------------------------------------------------------
1105  {
1106  pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1107  }
1108 
1109  //----------------------------------------------------------------------------
1110  // Unpack the message and call the response handler
1111  //----------------------------------------------------------------------------
1112  void XRootDMsgHandler::HandleResponse()
1113  {
1114  //--------------------------------------------------------------------------
1115  // Process the response and notify the listener
1116  //--------------------------------------------------------------------------
1118  XRootDStatus *status = ProcessStatus();
1119  AnyObject *response = 0;
1120 
1121  Log *log = DefaultEnv::GetLog();
1122  log->Debug( ExDbgMsg, "[%s] Calling MsgHandler: 0x%x (message: %s ) "
1123  "with status: %s.",
1124  pUrl.GetHostId().c_str(), this,
1125  pRequest->GetDescription().c_str(),
1126  status->ToString().c_str() );
1127 
1128  if( status->IsOK() )
1129  {
1130  Status st = ParseResponse( response );
1131  if( !st.IsOK() )
1132  {
1133  delete status;
1134  delete response;
1135  status = new XRootDStatus( st );
1136  response = 0;
1137  }
1138  }
1139 
1140  //--------------------------------------------------------------------------
1141  // Close the redirect entry if necessary
1142  //--------------------------------------------------------------------------
1143  if( pRdirEntry )
1144  {
1145  pRdirEntry->status = *status;
1146  pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
1147  }
1148 
1149  //--------------------------------------------------------------------------
1150  // Is it a final response?
1151  //--------------------------------------------------------------------------
1152  bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
1153 
1154  //--------------------------------------------------------------------------
1155  // Release the stream id
1156  //--------------------------------------------------------------------------
1157  if( pSidMgr && finalrsp )
1158  {
1159  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1160  if( status->IsOK() || !pMsgInFly ||
1161  !( status->code == errOperationExpired || status->code == errOperationInterrupted ) )
1162  pSidMgr->ReleaseSID( req->header.streamid );
1163  }
1164 
1165  HostList *hosts = pHosts.release();
1166  if( !finalrsp )
1167  pHosts.reset( new HostList( *hosts ) );
1168 
1169  pResponseHandler->HandleResponseWithHosts( status, response, hosts );
1170 
1171  //--------------------------------------------------------------------------
1172  // if it is the final response there is nothing more to do ...
1173  //--------------------------------------------------------------------------
1174  if( finalrsp )
1175  delete this;
1176  //--------------------------------------------------------------------------
1177  // on the other hand if it is not the final response, we have to keep the
1178  // MsgHandler and delete the current response
1179  //--------------------------------------------------------------------------
1180  else
1181  {
1182  XrdSysCondVarHelper lck( pCV );
1183  pResponse.reset();
1184  pTimeoutFence.store( false, std::memory_order_relaxed );
1185  pCV.Broadcast();
1186  }
1187  }
1188 
1189 
1190  //----------------------------------------------------------------------------
1191  // Extract the status information from the stuff that we got
1192  //----------------------------------------------------------------------------
1193  XRootDStatus *XRootDMsgHandler::ProcessStatus()
1194  {
1195  XRootDStatus *st = new XRootDStatus( pStatus );
1196  ServerResponse *rsp = 0;
1197  if( pResponse )
1198  rsp = (ServerResponse *)pResponse->GetBuffer();
1199 
1200  if( !pStatus.IsOK() && rsp )
1201  {
1202  if( pStatus.code == errErrorResponse )
1203  {
1204  st->errNo = rsp->body.error.errnum;
1205  // omit the last character as the string returned from the server
1206  // (acording to protocol specs) should be null-terminated
1207  std::string errmsg( rsp->body.error.errmsg, rsp->hdr.dlen-5 );
1208  if( st->errNo == kXR_noReplicas && !pLastError.IsOK() )
1209  errmsg += " Last seen error: " + pLastError.ToString();
1210  st->SetErrorMessage( errmsg );
1211  }
1212  else if( pStatus.code == errRedirect )
1213  st->SetErrorMessage( pRedirectUrl );
1214  }
1215  return st;
1216  }
1217 
1218  //------------------------------------------------------------------------
1219  // Parse the response and put it in an object that could be passed to
1220  // the user
1221  //------------------------------------------------------------------------
1222  Status XRootDMsgHandler::ParseResponse( AnyObject *&response )
1223  {
1224  if( !pResponse )
1225  return Status();
1226 
1227  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
1228  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1229  Log *log = DefaultEnv::GetLog();
1230 
1231  //--------------------------------------------------------------------------
1232  // Handle redirect as an answer
1233  //--------------------------------------------------------------------------
1234  if( rsp->hdr.status == kXR_redirect )
1235  {
1236  log->Error( XRootDMsg, "Internal Error: unable to process redirect" );
1237  return 0;
1238  }
1239 
1240  Buffer buff;
1241  uint32_t length = 0;
1242  char *buffer = 0;
1243 
1244  //--------------------------------------------------------------------------
1245  // We don't have any partial answers so pass what we have
1246  //--------------------------------------------------------------------------
1247  if( pPartialResps.empty() )
1248  {
1249  buffer = rsp->body.buffer.data;
1250  length = rsp->hdr.dlen;
1251  }
1252  //--------------------------------------------------------------------------
1253  // Partial answers, we need to glue them together before parsing
1254  //--------------------------------------------------------------------------
1255  else if( req->header.requestid != kXR_read &&
1256  req->header.requestid != kXR_readv )
1257  {
1258  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1259  {
1260  ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1261  length += part->hdr.dlen;
1262  }
1263  length += rsp->hdr.dlen;
1264 
1265  buff.Allocate( length );
1266  uint32_t offset = 0;
1267  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1268  {
1269  ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1270  buff.Append( part->body.buffer.data, part->hdr.dlen, offset );
1271  offset += part->hdr.dlen;
1272  }
1273  buff.Append( rsp->body.buffer.data, rsp->hdr.dlen, offset );
1274  buffer = buff.GetBuffer();
1275  }
1276 
1277  //--------------------------------------------------------------------------
1278  // Right, but what was the question?
1279  //--------------------------------------------------------------------------
1280  switch( req->header.requestid )
1281  {
1282  //------------------------------------------------------------------------
1283  // kXR_mv, kXR_truncate, kXR_rm, kXR_mkdir, kXR_rmdir, kXR_chmod,
1284  // kXR_ping, kXR_close, kXR_write, kXR_sync
1285  //------------------------------------------------------------------------
1286  case kXR_mv:
1287  case kXR_truncate:
1288  case kXR_rm:
1289  case kXR_mkdir:
1290  case kXR_rmdir:
1291  case kXR_chmod:
1292  case kXR_ping:
1293  case kXR_close:
1294  case kXR_write:
1295  case kXR_writev:
1296  case kXR_sync:
1297  case kXR_chkpoint:
1298  return Status();
1299 
1300  //------------------------------------------------------------------------
1301  // kXR_locate
1302  //------------------------------------------------------------------------
1303  case kXR_locate:
1304  {
1305  AnyObject *obj = new AnyObject();
1306 
1307  char *nullBuffer = new char[length+1];
1308  nullBuffer[length] = 0;
1309  memcpy( nullBuffer, buffer, length );
1310 
1311  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1312  "LocateInfo: %s", pUrl.GetHostId().c_str(),
1313  pRequest->GetDescription().c_str(), nullBuffer );
1314  LocationInfo *data = new LocationInfo();
1315 
1316  if( data->ParseServerResponse( nullBuffer ) == false )
1317  {
1318  delete obj;
1319  delete data;
1320  delete [] nullBuffer;
1321  return Status( stError, errInvalidResponse );
1322  }
1323  delete [] nullBuffer;
1324 
1325  obj->Set( data );
1326  response = obj;
1327  return Status();
1328  }
1329 
1330  //------------------------------------------------------------------------
1331  // kXR_stat
1332  //------------------------------------------------------------------------
1333  case kXR_stat:
1334  {
1335  AnyObject *obj = new AnyObject();
1336 
1337  //----------------------------------------------------------------------
1338  // Virtual File System stat (kXR_vfs)
1339  //----------------------------------------------------------------------
1340  if( req->stat.options & kXR_vfs )
1341  {
1342  StatInfoVFS *data = new StatInfoVFS();
1343 
1344  char *nullBuffer = new char[length+1];
1345  nullBuffer[length] = 0;
1346  memcpy( nullBuffer, buffer, length );
1347 
1348  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1349  "StatInfoVFS: %s", pUrl.GetHostId().c_str(),
1350  pRequest->GetDescription().c_str(), nullBuffer );
1351 
1352  if( data->ParseServerResponse( nullBuffer ) == false )
1353  {
1354  delete obj;
1355  delete data;
1356  delete [] nullBuffer;
1357  return Status( stError, errInvalidResponse );
1358  }
1359  delete [] nullBuffer;
1360 
1361  obj->Set( data );
1362  }
1363  //----------------------------------------------------------------------
1364  // Normal stat
1365  //----------------------------------------------------------------------
1366  else
1367  {
1368  StatInfo *data = new StatInfo();
1369 
1370  char *nullBuffer = new char[length+1];
1371  nullBuffer[length] = 0;
1372  memcpy( nullBuffer, buffer, length );
1373 
1374  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as StatInfo: "
1375  "%s", pUrl.GetHostId().c_str(),
1376  pRequest->GetDescription().c_str(), nullBuffer );
1377 
1378  if( data->ParseServerResponse( nullBuffer ) == false )
1379  {
1380  delete obj;
1381  delete data;
1382  delete [] nullBuffer;
1383  return Status( stError, errInvalidResponse );
1384  }
1385  delete [] nullBuffer;
1386  obj->Set( data );
1387  }
1388 
1389  response = obj;
1390  return Status();
1391  }
1392 
1393  //------------------------------------------------------------------------
1394  // kXR_protocol
1395  //------------------------------------------------------------------------
1396  case kXR_protocol:
1397  {
1398  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ProtocolInfo",
1399  pUrl.GetHostId().c_str(),
1400  pRequest->GetDescription().c_str() );
1401 
1402  if( rsp->hdr.dlen < 8 )
1403  {
1404  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
1405  pUrl.GetHostId().c_str() );
1406  return Status( stError, errInvalidResponse );
1407  }
1408 
1409  AnyObject *obj = new AnyObject();
1410  ProtocolInfo *data = new ProtocolInfo( rsp->body.protocol.pval,
1411  rsp->body.protocol.flags );
1412  obj->Set( data );
1413  response = obj;
1414  return Status();
1415  }
1416 
1417  //------------------------------------------------------------------------
1418  // kXR_dirlist
1419  //------------------------------------------------------------------------
1420  case kXR_dirlist:
1421  {
1422  AnyObject *obj = new AnyObject();
1423  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1424  "DirectoryList", pUrl.GetHostId().c_str(),
1425  pRequest->GetDescription().c_str() );
1426 
1427  char *path = new char[req->dirlist.dlen+1];
1428  path[req->dirlist.dlen] = 0;
1429  memcpy( path, pRequest->GetBuffer(24), req->dirlist.dlen );
1430 
1431  DirectoryList *data = new DirectoryList();
1432  data->SetParentName( path );
1433  delete [] path;
1434 
1435  char *nullBuffer = new char[length+1];
1436  nullBuffer[length] = 0;
1437  memcpy( nullBuffer, buffer, length );
1438 
1439  bool invalidrsp = false;
1440 
1441  if( !pDirListStarted )
1442  {
1443  pDirListWithStat = DirectoryList::HasStatInfo( nullBuffer );
1444  pDirListStarted = true;
1445 
1446  invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer );
1447  }
1448  else
1449  invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer, pDirListWithStat );
1450 
1451  if( invalidrsp )
1452  {
1453  delete data;
1454  delete obj;
1455  delete [] nullBuffer;
1456  return Status( stError, errInvalidResponse );
1457  }
1458 
1459  delete [] nullBuffer;
1460  obj->Set( data );
1461  response = obj;
1462  return Status();
1463  }
1464 
1465  //------------------------------------------------------------------------
1466  // kXR_open - if we got the statistics, otherwise return 0
1467  //------------------------------------------------------------------------
1468  case kXR_open:
1469  {
1470  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as OpenInfo",
1471  pUrl.GetHostId().c_str(),
1472  pRequest->GetDescription().c_str() );
1473 
1474  if( rsp->hdr.dlen < 4 )
1475  {
1476  log->Error( XRootDMsg, "[%s] Got invalid open response.",
1477  pUrl.GetHostId().c_str() );
1478  return Status( stError, errInvalidResponse );
1479  }
1480 
1481  AnyObject *obj = new AnyObject();
1482  StatInfo *statInfo = 0;
1483 
1484  //----------------------------------------------------------------------
1485  // Handle StatInfo if requested
1486  //----------------------------------------------------------------------
1487  if( req->open.options & kXR_retstat )
1488  {
1489  log->Dump( XRootDMsg, "[%s] Parsing StatInfo in response to %s",
1490  pUrl.GetHostId().c_str(),
1491  pRequest->GetDescription().c_str() );
1492 
1493  if( rsp->hdr.dlen >= 12 )
1494  {
1495  char *nullBuffer = new char[rsp->hdr.dlen-11];
1496  nullBuffer[rsp->hdr.dlen-12] = 0;
1497  memcpy( nullBuffer, buffer+12, rsp->hdr.dlen-12 );
1498 
1499  statInfo = new StatInfo();
1500  if( statInfo->ParseServerResponse( nullBuffer ) == false )
1501  {
1502  delete statInfo;
1503  statInfo = 0;
1504  }
1505  delete [] nullBuffer;
1506  }
1507 
1508  if( rsp->hdr.dlen < 12 || !statInfo )
1509  {
1510  log->Error( XRootDMsg, "[%s] Unable to parse StatInfo in response "
1511  "to %s", pUrl.GetHostId().c_str(),
1512  pRequest->GetDescription().c_str() );
1513  delete obj;
1514  return Status( stError, errInvalidResponse );
1515  }
1516  }
1517 
1518  OpenInfo *data = new OpenInfo( (uint8_t*)buffer,
1519  pResponse->GetSessionId(),
1520  statInfo );
1521  obj->Set( data );
1522  response = obj;
1523  return Status();
1524  }
1525 
1526  //------------------------------------------------------------------------
1527  // kXR_read
1528  //------------------------------------------------------------------------
1529  case kXR_read:
1530  {
1531  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ChunkInfo",
1532  pUrl.GetHostId().c_str(),
1533  pRequest->GetDescription().c_str() );
1534 
1535  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1536  {
1537  //--------------------------------------------------------------------
1538  // we are expecting to have only the header in the message, the raw
1539  // data have been readout into the user buffer
1540  //--------------------------------------------------------------------
1541  if( pPartialResps[i]->GetSize() > 8 )
1542  return Status( stOK, errInternal );
1543  }
1544  //----------------------------------------------------------------------
1545  // we are expecting to have only the header in the message, the raw
1546  // data have been readout into the user buffer
1547  //----------------------------------------------------------------------
1548  if( pResponse->GetSize() > 8 )
1549  return Status( stOK, errInternal );
1550  //----------------------------------------------------------------------
1551  // Get the response for the end user
1552  //----------------------------------------------------------------------
1553  return pBodyReader->GetResponse( response );
1554  }
1555 
1556  //------------------------------------------------------------------------
1557  // kXR_pgread
1558  //------------------------------------------------------------------------
1559  case kXR_pgread:
1560  {
1561  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as PageInfo",
1562  pUrl.GetHostId().c_str(),
1563  pRequest->GetDescription().c_str() );
1564 
1565  //----------------------------------------------------------------------
1566  // Glue in the cached responses if necessary
1567  //----------------------------------------------------------------------
1568  ChunkInfo chunk = pChunkList->front();
1569  bool sizeMismatch = false;
1570  uint32_t currentOffset = 0;
1571  char *cursor = (char*)chunk.buffer;
1572  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1573  {
1574  ServerResponseV2 *part = (ServerResponseV2*)pPartialResps[i]->GetBuffer();
1575 
1576  //--------------------------------------------------------------------
1577  // the actual size of the raw data without the crc32c checksums
1578  //--------------------------------------------------------------------
1579  size_t datalen = part->status.bdy.dlen - NbPgPerRsp( part->info.pgread.offset,
1580  part->status.bdy.dlen ) * CksumSize;
1581 
1582  if( currentOffset + datalen > chunk.length )
1583  {
1584  sizeMismatch = true;
1585  break;
1586  }
1587 
1588  currentOffset += datalen;
1589  cursor += datalen;
1590  }
1591 
1592  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
1593  size_t datalen = rspst->status.bdy.dlen - NbPgPerRsp( rspst->info.pgread.offset,
1594  rspst->status.bdy.dlen ) * CksumSize;
1595  if( currentOffset + datalen <= chunk.length )
1596  currentOffset += datalen;
1597  else
1598  sizeMismatch = true;
1599 
1600  //----------------------------------------------------------------------
1601  // Overflow
1602  //----------------------------------------------------------------------
1603  if( pChunkStatus.front().sizeError || sizeMismatch )
1604  {
1605  log->Error( XRootDMsg, "[%s] Handling response to %s: user supplied "
1606  "buffer is too small for the received data.",
1607  pUrl.GetHostId().c_str(),
1608  pRequest->GetDescription().c_str() );
1609  return Status( stError, errInvalidResponse );
1610  }
1611 
1612  AnyObject *obj = new AnyObject();
1613  PageInfo *pgInfo = new PageInfo( chunk.offset, currentOffset, chunk.buffer,
1614  std::move( pCrc32cDigests) );
1615 
1616  obj->Set( pgInfo );
1617  response = obj;
1618  return Status();
1619  }
1620 
1621  //------------------------------------------------------------------------
1622  // kXR_pgwrite
1623  //------------------------------------------------------------------------
1624  case kXR_pgwrite:
1625  {
1626  std::vector<std::tuple<uint64_t, uint32_t>> retries;
1627 
1628  ServerResponseV2 *rsp = (ServerResponseV2*)pResponse->GetBuffer();
1629  if( rsp->status.bdy.dlen > 0 )
1630  {
1631  ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) );
1632  size_t pgcnt = ( rsp->status.bdy.dlen - 8 ) / sizeof( kXR_int64 );
1633  retries.reserve( pgcnt );
1634  kXR_int64 *pgoffs = (kXR_int64*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) +
1635  sizeof( ServerResponseBody_pgWrCSE ) );
1636 
1637  for( size_t i = 0; i < pgcnt; ++i )
1638  {
1639  uint32_t len = XrdSys::PageSize;
1640  if( i == 0 ) len = cse->dlFirst;
1641  else if( i == pgcnt - 1 ) len = cse->dlLast;
1642  retries.push_back( std::make_tuple( pgoffs[i], len ) );
1643  }
1644  }
1645 
1646  RetryInfo *info = new RetryInfo( std::move( retries ) );
1647  AnyObject *obj = new AnyObject();
1648  obj->Set( info );
1649  response = obj;
1650 
1651  return Status();
1652  }
1653 
1654 
1655  //------------------------------------------------------------------------
1656  // kXR_readv - we need to pass the length of the buffer to the user code
1657  //------------------------------------------------------------------------
1658  case kXR_readv:
1659  {
1660  log->Dump( XRootDMsg, "[%s] Parsing the response to 0x%x as "
1661  "VectorReadInfo", pUrl.GetHostId().c_str(),
1662  pRequest->GetDescription().c_str() );
1663 
1664  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1665  {
1666  //--------------------------------------------------------------------
1667  // we are expecting to have only the header in the message, the raw
1668  // data have been readout into the user buffer
1669  //--------------------------------------------------------------------
1670  if( pPartialResps[i]->GetSize() > 8 )
1671  return Status( stOK, errInternal );
1672  }
1673  //----------------------------------------------------------------------
1674  // we are expecting to have only the header in the message, the raw
1675  // data have been readout into the user buffer
1676  //----------------------------------------------------------------------
1677  if( pResponse->GetSize() > 8 )
1678  return Status( stOK, errInternal );
1679  //----------------------------------------------------------------------
1680  // Get the response for the end user
1681  //----------------------------------------------------------------------
1682  return pBodyReader->GetResponse( response );
1683  }
1684 
1685  //------------------------------------------------------------------------
1686  // kXR_fattr
1687  //------------------------------------------------------------------------
1688  case kXR_fattr:
1689  {
1690  int len = rsp->hdr.dlen;
1691  char* data = rsp->body.buffer.data;
1692 
1693  return ParseXAttrResponse( data, len, response );
1694  }
1695 
1696  //------------------------------------------------------------------------
1697  // kXR_query
1698  //------------------------------------------------------------------------
1699  case kXR_query:
1700  case kXR_set:
1701  case kXR_prepare:
1702  default:
1703  {
1704  AnyObject *obj = new AnyObject();
1705  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as BinaryData",
1706  pUrl.GetHostId().c_str(),
1707  pRequest->GetDescription().c_str() );
1708 
1709  BinaryDataInfo *data = new BinaryDataInfo();
1710  data->Allocate( length );
1711  data->Append( buffer, length );
1712  obj->Set( data );
1713  response = obj;
1714  return Status();
1715  }
1716  };
1717  return Status( stError, errInvalidMessage );
1718  }
1719 
1720  //------------------------------------------------------------------------
1721  // Parse the response to kXR_fattr request and put it in an object that
1722  // could be passed to the user
1723  //------------------------------------------------------------------------
1724  Status XRootDMsgHandler::ParseXAttrResponse( char *data, size_t len,
1725  AnyObject *&response )
1726  {
1727  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1728 // Log *log = DefaultEnv::GetLog(); //TODO
1729 
1730  switch( req->fattr.subcode )
1731  {
1732  case kXR_fattrDel:
1733  case kXR_fattrSet:
1734  {
1735  Status status;
1736 
1737  kXR_char nerrs = 0;
1738  if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1739  return status;
1740 
1741  kXR_char nattr = 0;
1742  if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1743  return status;
1744 
1745  std::vector<XAttrStatus> resp;
1746  // read the namevec
1747  for( kXR_char i = 0; i < nattr; ++i )
1748  {
1749  kXR_unt16 rc = 0;
1750  if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1751  return status;
1752  rc = ntohs( rc );
1753 
1754  // count errors
1755  if( rc ) --nerrs;
1756 
1757  std::string name;
1758  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1759  return status;
1760 
1761  XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1762  XRootDStatus();
1763  resp.push_back( XAttrStatus( name, st ) );
1764  }
1765 
1766  // check if we read all the data and if the error count is OK
1767  if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1768 
1769  // set up the response object
1770  response = new AnyObject();
1771  response->Set( new std::vector<XAttrStatus>( std::move( resp ) ) );
1772 
1773  return Status();
1774  }
1775 
1776  case kXR_fattrGet:
1777  {
1778  Status status;
1779 
1780  kXR_char nerrs = 0;
1781  if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1782  return status;
1783 
1784  kXR_char nattr = 0;
1785  if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1786  return status;
1787 
1788  std::vector<XAttr> resp;
1789  resp.reserve( nattr );
1790 
1791  // read the name vec
1792  for( kXR_char i = 0; i < nattr; ++i )
1793  {
1794  kXR_unt16 rc = 0;
1795  if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1796  return status;
1797  rc = ntohs( rc );
1798 
1799  // count errors
1800  if( rc ) --nerrs;
1801 
1802  std::string name;
1803  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1804  return status;
1805 
1806  XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1807  XRootDStatus();
1808  resp.push_back( XAttr( name, st ) );
1809  }
1810 
1811  // read the value vec
1812  for( kXR_char i = 0; i < nattr; ++i )
1813  {
1814  kXR_int32 vlen = 0;
1815  if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1816  return status;
1817  vlen = ntohl( vlen );
1818 
1819  std::string value;
1820  if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1821  return status;
1822 
1823  resp[i].value.swap( value );
1824  }
1825 
1826  // check if we read all the data and if the error count is OK
1827  if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1828 
1829  // set up the response object
1830  response = new AnyObject();
1831  response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1832 
1833  return Status();
1834  }
1835 
1836  case kXR_fattrList:
1837  {
1838  Status status;
1839  std::vector<XAttr> resp;
1840 
1841  while( len > 0 )
1842  {
1843  std::string name;
1844  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1845  return status;
1846 
1847  kXR_int32 vlen = 0;
1848  if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1849  return status;
1850  vlen = ntohl( vlen );
1851 
1852  std::string value;
1853  if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1854  return status;
1855 
1856  resp.push_back( XAttr( name, value ) );
1857  }
1858 
1859  // set up the response object
1860  response = new AnyObject();
1861  response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1862 
1863  return Status();
1864  }
1865 
1866  default:
1867  return Status( stError, errDataError );
1868  }
1869  }
1870 
1871  //----------------------------------------------------------------------------
1872  // Perform the changes to the original request needed by the redirect
1873  // procedure - allocate new streamid, append redirection data and such
1874  //----------------------------------------------------------------------------
1875  Status XRootDMsgHandler::RewriteRequestRedirect( const URL &newUrl )
1876  {
1877  Log *log = DefaultEnv::GetLog();
1878 
1879  Status st;
1880  // Append any "xrd.*" parameters present in newCgi so that any authentication
1881  // requirements are properly enforced
1882  const URL::ParamsMap &newCgi = newUrl.GetParams();
1883  std::string xrdCgi = "";
1884  std::ostringstream ossXrd;
1885  for(URL::ParamsMap::const_iterator it = newCgi.begin(); it != newCgi.end(); ++it )
1886  {
1887  if( it->first.compare( 0, 4, "xrd." ) )
1888  continue;
1889  ossXrd << it->first << '=' << it->second << '&';
1890  }
1891 
1892  xrdCgi = ossXrd.str();
1893  // Redirection URL containing also any original xrd.* opaque parameters
1894  XrdCl::URL authUrl;
1895 
1896  if (xrdCgi.empty())
1897  {
1898  authUrl = newUrl;
1899  }
1900  else
1901  {
1902  std::string surl = newUrl.GetURL();
1903  (surl.find('?') == std::string::npos) ? (surl += '?') :
1904  ((*surl.rbegin() != '&') ? (surl += '&') : (surl += ""));
1905  surl += xrdCgi;
1906 
1907  if (!authUrl.FromString(surl))
1908  {
1909  log->Error( XRootDMsg, "[%s] Failed to build redirection URL from data:"
1910  "%s", surl.c_str());
1911  return Status(stError, errInvalidRedirectURL);
1912  }
1913  }
1914 
1915  //--------------------------------------------------------------------------
1916  // Rewrite particular requests
1917  //--------------------------------------------------------------------------
1919  MessageUtils::RewriteCGIAndPath( pRequest, newCgi, true, newUrl.GetPath() );
1921  return Status();
1922  }
1923 
1924  //----------------------------------------------------------------------------
1925  // Some requests need to be rewritten also after getting kXR_wait
1926  //----------------------------------------------------------------------------
1927  Status XRootDMsgHandler::RewriteRequestWait()
1928  {
1929  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1930 
1932 
1933  //------------------------------------------------------------------------
1934  // For kXR_locate and kXR_open request the kXR_refresh bit needs to be
1935  // turned off after wait
1936  //------------------------------------------------------------------------
1937  switch( req->header.requestid )
1938  {
1939  case kXR_locate:
1940  {
1941  uint16_t refresh = kXR_refresh;
1942  req->locate.options &= (~refresh);
1943  break;
1944  }
1945 
1946  case kXR_open:
1947  {
1948  uint16_t refresh = kXR_refresh;
1949  req->locate.options &= (~refresh);
1950  break;
1951  }
1952  }
1953 
1954  XRootDTransport::SetDescription( pRequest );
1956  return Status();
1957  }
1958 
1959  //----------------------------------------------------------------------------
1960  // Recover error
1961  //----------------------------------------------------------------------------
1962  void XRootDMsgHandler::HandleError( XRootDStatus status )
1963  {
1964  //--------------------------------------------------------------------------
1965  // If there was no error then do nothing
1966  //--------------------------------------------------------------------------
1967  if( status.IsOK() )
1968  return;
1969 
1970  if( pSidMgr && pMsgInFly && (
1971  status.code == errOperationExpired ||
1972  status.code == errOperationInterrupted ) )
1973  {
1974  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1975  pSidMgr->TimeOutSID( req->header.streamid );
1976  }
1977 
1978  bool noreplicas = ( status.code == errErrorResponse &&
1979  status.errNo == kXR_noReplicas );
1980 
1981  if( !noreplicas ) pLastError = status;
1982 
1983  Log *log = DefaultEnv::GetLog();
1984  log->Debug( XRootDMsg, "[%s] Handling error while processing %s: %s.",
1985  pUrl.GetHostId().c_str(), pRequest->GetDescription().c_str(),
1986  status.ToString().c_str() );
1987 
1988  //--------------------------------------------------------------------------
1989  // Check if it is a fatal TLS error that has been marked as potentially
1990  // recoverable, if yes check if we can downgrade from fatal to error.
1991  //--------------------------------------------------------------------------
1992  if( status.IsFatal() && status.code == errTlsError && status.errNo == EAGAIN )
1993  {
1994  if( pSslErrCnt < MaxSslErrRetry )
1995  {
1996  status.status &= ~stFatal; // switch off fatal&error bits
1997  status.status |= stError; // switch on error bit
1998  }
1999  ++pSslErrCnt; // count number of consecutive SSL errors
2000  }
2001  else
2002  pSslErrCnt = 0;
2003 
2004  //--------------------------------------------------------------------------
2005  // We have got an error message, we can recover it at the load balancer if:
2006  // 1) we haven't got it from the load balancer
2007  // 2) we have a load balancer assigned
2008  // 3) the error is either one of: kXR_FSError, kXR_IOError, kXR_ServerError,
2009  // kXR_NotFound
2010  // 4) in the case of kXR_NotFound a kXR_refresh flags needs to be set
2011  //--------------------------------------------------------------------------
2012  if( status.code == errErrorResponse )
2013  {
2014  if( RetriableErrorResponse( status ) )
2015  {
2016  UpdateTriedCGI(status.errNo);
2017  if( status.errNo == kXR_NotFound || status.errNo == kXR_Overloaded )
2018  SwitchOnRefreshFlag();
2019  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2020  return;
2021  }
2022  else
2023  {
2024  pStatus = status;
2025  HandleRspOrQueue();
2026  return;
2027  }
2028  }
2029 
2030  //--------------------------------------------------------------------------
2031  // Nothing can be done if:
2032  // 1) a user timeout has occurred
2033  // 2) has a non-zero session id
2034  // 3) if another error occurred and the validity of the message expired
2035  //--------------------------------------------------------------------------
2036  if( status.code == errOperationExpired || pRequest->GetSessionId() ||
2037  status.code == errOperationInterrupted || time(0) >= pExpiration )
2038  {
2039  log->Error( XRootDMsg, "[%s] Unable to get the response to request %s",
2040  pUrl.GetHostId().c_str(),
2041  pRequest->GetDescription().c_str() );
2042  pStatus = status;
2043  HandleRspOrQueue();
2044  return;
2045  }
2046 
2047  //--------------------------------------------------------------------------
2048  // At this point we're left with connection errors, we recover them
2049  // at a load balancer if we have one and if not on the current server
2050  // until we get a response, an unrecoverable error or a timeout
2051  //--------------------------------------------------------------------------
2052  if( pLoadBalancer.url.IsValid() &&
2053  pLoadBalancer.url.GetLocation() != pUrl.GetLocation() )
2054  {
2055  UpdateTriedCGI( kXR_ServerError );
2056  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2057  return;
2058  }
2059  else
2060  {
2061  if( !status.IsFatal() && IsRetriable() )
2062  {
2063  log->Info( XRootDMsg, "[%s] Retrying request: %s.",
2064  pUrl.GetHostId().c_str(),
2065  pRequest->GetDescription().c_str() );
2066 
2067  UpdateTriedCGI( kXR_ServerError );
2068  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryRetry ) );
2069  return;
2070  }
2071  pStatus = status;
2072  HandleRspOrQueue();
2073  return;
2074  }
2075  }
2076 
2077  //----------------------------------------------------------------------------
2078  // Retry the message at another server
2079  //----------------------------------------------------------------------------
2080  Status XRootDMsgHandler::RetryAtServer( const URL &url, RedirectEntry::Type entryType )
2081  {
2082  pResponse.reset();
2083  Log *log = DefaultEnv::GetLog();
2084 
2085  //--------------------------------------------------------------------------
2086  // Set up a redirect entry
2087  //--------------------------------------------------------------------------
2088  if( pRdirEntry ) pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
2089  pRdirEntry.reset( new RedirectEntry( pUrl.GetLocation(), url.GetLocation(), entryType ) );
2090 
2091  if( pUrl.GetLocation() != url.GetLocation() )
2092  {
2093  pHosts->push_back( url );
2094 
2095  //------------------------------------------------------------------------
2096  // Assign a new stream id to the message
2097  //------------------------------------------------------------------------
2098 
2099  // first release the old stream id
2100  // (though it could be a redirect from a local
2101  // metalink file, in this case there's no SID)
2102  ClientRequestHdr *req = (ClientRequestHdr*)pRequest->GetBuffer();
2103  if( pSidMgr )
2104  {
2105  pSidMgr->ReleaseSID( req->streamid );
2106  pSidMgr.reset();
2107  }
2108 
2109  // then get the new SIDManager
2110  // (again this could be a redirect to a local
2111  // file and in this case there is no SID)
2112  if( !url.IsLocalFile() )
2113  {
2114  pSidMgr = SIDMgrPool::Instance().GetSIDMgr( url );
2115  Status st = pSidMgr->AllocateSID( req->streamid );
2116  if( !st.IsOK() )
2117  {
2118  log->Error( XRootDMsg, "[%s] Impossible to send message %s.",
2119  pUrl.GetHostId().c_str(),
2120  pRequest->GetDescription().c_str() );
2121  return st;
2122  }
2123  }
2124 
2125  pUrl = url;
2126  }
2127 
2128  if( pUrl.IsMetalink() && pFollowMetalink )
2129  {
2130  log->Debug( ExDbgMsg, "[%s] Metaling redirection for MsgHandler: 0x%x (message: %s ).",
2131  pUrl.GetHostId().c_str(), this,
2132  pRequest->GetDescription().c_str() );
2133 
2134  return pPostMaster->Redirect( pUrl, pRequest, this );
2135  }
2136  else if( pUrl.IsLocalFile() )
2137  {
2138  HandleLocalRedirect( &pUrl );
2139  return Status();
2140  }
2141  else
2142  {
2143  log->Debug( ExDbgMsg, "[%s] Retry at server MsgHandler: 0x%x (message: %s ).",
2144  pUrl.GetHostId().c_str(), this,
2145  pRequest->GetDescription().c_str() );
2146  return pPostMaster->Send( pUrl, pRequest, this, true, pExpiration );
2147  }
2148  }
2149 
2150  //----------------------------------------------------------------------------
2151  // Update the "tried=" part of the CGI of the current message
2152  //----------------------------------------------------------------------------
2153  void XRootDMsgHandler::UpdateTriedCGI(uint32_t errNo)
2154  {
2155  URL::ParamsMap cgi;
2156  std::string tried;
2157 
2158  //--------------------------------------------------------------------------
2159  // In case a data server responded with a kXR_redirect and we fail at the
2160  // node where we were redirected to, the original data server should be
2161  // included in the tried CGI opaque info (instead of the current one).
2162  //--------------------------------------------------------------------------
2163  if( pEffectiveDataServerUrl )
2164  {
2165  tried = pEffectiveDataServerUrl->GetHostName();
2166  delete pEffectiveDataServerUrl;
2167  pEffectiveDataServerUrl = 0;
2168  }
2169  //--------------------------------------------------------------------------
2170  // Otherwise use the current URL.
2171  //--------------------------------------------------------------------------
2172  else
2173  tried = pUrl.GetHostName();
2174 
2175  // Report the reason for the failure to the next location
2176  //
2177  if (errNo)
2178  { if (errNo == kXR_NotFound) cgi["triedrc"] = "enoent";
2179  else if (errNo == kXR_IOError) cgi["triedrc"] = "ioerr";
2180  else if (errNo == kXR_FSError) cgi["triedrc"] = "fserr";
2181  else if (errNo == kXR_ServerError) cgi["triedrc"] = "srverr";
2182  }
2183 
2184  //--------------------------------------------------------------------------
2185  // If our current load balancer is a metamanager and we failed either
2186  // at a diskserver or at an unidentified node we also exclude the last
2187  // known manager
2188  //--------------------------------------------------------------------------
2189  if( pLoadBalancer.url.IsValid() && (pLoadBalancer.flags & kXR_attrMeta) )
2190  {
2191  HostList::reverse_iterator it;
2192  for( it = pHosts->rbegin()+1; it != pHosts->rend(); ++it )
2193  {
2194  if( it->loadBalancer )
2195  break;
2196 
2197  tried += "," + it->url.GetHostName();
2198 
2199  if( it->flags & kXR_isManager )
2200  break;
2201  }
2202  }
2203 
2204  cgi["tried"] = tried;
2206  MessageUtils::RewriteCGIAndPath( pRequest, cgi, false, "" );
2208  }
2209 
2210  //----------------------------------------------------------------------------
2211  // Switch on the refresh flag for some requests
2212  //----------------------------------------------------------------------------
2213  void XRootDMsgHandler::SwitchOnRefreshFlag()
2214  {
2216  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2217  switch( req->header.requestid )
2218  {
2219  case kXR_locate:
2220  {
2221  req->locate.options |= kXR_refresh;
2222  break;
2223  }
2224 
2225  case kXR_open:
2226  {
2227  req->locate.options |= kXR_refresh;
2228  break;
2229  }
2230  }
2231  XRootDTransport::SetDescription( pRequest );
2233  }
2234 
2235  //------------------------------------------------------------------------
2236  // If the current thread is a worker thread from our thread-pool
2237  // handle the response, otherwise submit a new task to the thread-pool
2238  //------------------------------------------------------------------------
2239  void XRootDMsgHandler::HandleRspOrQueue()
2240  {
2241  JobManager *jobMgr = pPostMaster->GetJobManager();
2242  if( jobMgr->IsWorker() )
2243  HandleResponse();
2244  else
2245  {
2246  Log *log = DefaultEnv::GetLog();
2247  log->Debug( ExDbgMsg, "[%s] Passing to the thread-pool MsgHandler: 0x%x (message: %s ).",
2248  pUrl.GetHostId().c_str(), this,
2249  pRequest->GetDescription().c_str() );
2250  jobMgr->QueueJob( new HandleRspJob( this ), 0 );
2251  }
2252  }
2253 
2254  //------------------------------------------------------------------------
2255  // Notify the FileStateHandler to retry Open() with new URL
2256  //------------------------------------------------------------------------
2257  void XRootDMsgHandler::HandleLocalRedirect( URL *url )
2258  {
2259  Log *log = DefaultEnv::GetLog();
2260  log->Debug( ExDbgMsg, "[%s] Handling local redirect - MsgHandler: 0x%x (message: %s ).",
2261  pUrl.GetHostId().c_str(), this,
2262  pRequest->GetDescription().c_str() );
2263 
2264  if( !pLFileHandler )
2265  {
2266  HandleError( XRootDStatus( stFatal, errNotSupported ) );
2267  return;
2268  }
2269 
2270  AnyObject *resp = 0;
2271  pLFileHandler->SetHostList( *pHosts );
2272  XRootDStatus st = pLFileHandler->Open( url, pRequest, resp );
2273  if( !st.IsOK() )
2274  {
2275  HandleError( st );
2276  return;
2277  }
2278 
2279  pResponseHandler->HandleResponseWithHosts( new XRootDStatus(),
2280  resp,
2281  pHosts.release() );
2282  delete this;
2283 
2284  return;
2285  }
2286 
2287  //------------------------------------------------------------------------
2288  // Check if it is OK to retry this request
2289  //------------------------------------------------------------------------
2290  bool XRootDMsgHandler::IsRetriable()
2291  {
2292  std::string value;
2293  DefaultEnv::GetEnv()->GetString( "OpenRecovery", value );
2294  if( value == "true" ) return true;
2295 
2296  // check if it is a mutable open (open + truncate or open + create)
2297  ClientRequest *req = reinterpret_cast<ClientRequest*>( pRequest->GetBuffer() );
2298  if( req->header.requestid == htons( kXR_open ) )
2299  {
2300  bool _mutable = ( req->open.options & htons( kXR_delete ) ) ||
2301  ( req->open.options & htons( kXR_new ) );
2302 
2303  if( _mutable )
2304  {
2305  Log *log = DefaultEnv::GetLog();
2306  log->Debug( XRootDMsg,
2307  "[%s] Not allowed to retry open request (OpenRecovery disabled): %s.",
2308  pUrl.GetHostId().c_str(),
2309  pRequest->GetDescription().c_str() );
2310  // disallow retry if it is a mutable open
2311  return false;
2312  }
2313  }
2314 
2315  return true;
2316  }
2317 
2318  //------------------------------------------------------------------------
2319  // Check if for given request and Metalink redirector it is OK to omit
2320  // the kXR_wait and proceed straight to the next entry in the Metalink file
2321  //------------------------------------------------------------------------
2322  bool XRootDMsgHandler::OmitWait( Message &request, const URL &url )
2323  {
2324  // we can omit kXR_wait only if we have a Metalink redirector
2325  if( !url.IsMetalink() )
2326  return false;
2327 
2328  // we can omit kXR_wait only for requests that can be redirected
2329  // (kXR_read is the only stateful request that can be redirected)
2330  ClientRequest *req = reinterpret_cast<ClientRequest*>( request.GetBuffer() );
2331  if( pStateful && req->header.requestid != kXR_read )
2332  return false;
2333 
2334  // we can only omit kXR_wait if the Metalink redirect has more
2335  // replicas
2336  RedirectorRegistry &registry = RedirectorRegistry::Instance();
2337  VirtualRedirector *redirector = registry.Get( url );
2338 
2339  // we need more than one server as the current one is not reflected
2340  // in tried CGI
2341  if( redirector->Count( request ) > 1 )
2342  return true;
2343 
2344  return false;
2345  }
2346 
2347  //------------------------------------------------------------------------
2348  // Checks if the given error returned by server is retriable.
2349  //------------------------------------------------------------------------
2350  bool XRootDMsgHandler::RetriableErrorResponse( const Status &status )
2351  {
2352  // we can only retry error response if we have a valid load-balancer and
2353  // it is not our current URL
2354  if( !( pLoadBalancer.url.IsValid() &&
2355  pUrl.GetLocation() != pLoadBalancer.url.GetLocation() ) )
2356  return false;
2357 
2358  // following errors are retriable at any load-balancer
2359  if( status.errNo == kXR_FSError || status.errNo == kXR_IOError ||
2360  status.errNo == kXR_ServerError || status.errNo == kXR_NotFound ||
2361  status.errNo == kXR_Overloaded || status.errNo == kXR_NoMemory )
2362  return true;
2363 
2364  // check if the load-balancer is a meta-manager, if yes there are
2365  // more errors that can be recovered
2366  if( !( pLoadBalancer.flags & kXR_attrMeta ) ) return false;
2367 
2368  // those errors are retriable for meta-managers
2369  if( status.errNo == kXR_Unsupported || status.errNo == kXR_FileLocked )
2370  return true;
2371 
2372  // in case of not-authorized error there is an imposed upper limit
2373  // on how many times we can retry this error
2374  if( status.errNo == kXR_NotAuthorized )
2375  {
2376  int limit = DefaultNotAuthorizedRetryLimit;
2377  DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", limit );
2378  bool ret = pNotAuthorizedCounter < limit;
2379  ++pNotAuthorizedCounter;
2380  if( !ret )
2381  {
2382  Log *log = DefaultEnv::GetLog();
2383  log->Error( XRootDMsg,
2384  "[%s] Reached limit of NotAuthorized retries!",
2385  pUrl.GetHostId().c_str() );
2386  }
2387  return ret;
2388  }
2389 
2390  // check if the load-balancer is a virtual (metalink) redirector,
2391  // if yes there are even more errors that can be recovered
2392  if( !( pLoadBalancer.flags & kXR_attrVirtRdr ) ) return false;
2393 
2394  // those errors are retriable for virtual (metalink) redirectors
2395  if( status.errNo == kXR_noserver || status.errNo == kXR_ArgTooLong )
2396  return true;
2397 
2398  // otherwise it is a non-retriable error
2399  return false;
2400  }
2401 
2402  //------------------------------------------------------------------------
2403  // Dump the redirect-trace-back into the log file
2404  //------------------------------------------------------------------------
2405  void XRootDMsgHandler::DumpRedirectTraceBack()
2406  {
2407  if( pRedirectTraceBack.empty() ) return;
2408 
2409  std::stringstream sstrm;
2410 
2411  sstrm << "Redirect trace-back:\n";
2412 
2413  int counter = 0;
2414 
2415  auto itr = pRedirectTraceBack.begin();
2416  sstrm << '\t' << counter << ". " << (*itr)->ToString() << '\n';
2417 
2418  auto prev = itr;
2419  ++itr;
2420  ++counter;
2421 
2422  for( ; itr != pRedirectTraceBack.end(); ++itr, ++prev, ++counter )
2423  sstrm << '\t' << counter << ". "
2424  << (*itr)->ToString( (*prev)->status.IsOK() ) << '\n';
2425 
2426  int authlimit = DefaultNotAuthorizedRetryLimit;
2427  DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", authlimit );
2428 
2429  bool warn = !pStatus.IsOK() &&
2430  ( pStatus.code == errNotFound ||
2431  pStatus.code == errRedirectLimit ||
2432  ( pStatus.code == errAuthFailed && pNotAuthorizedCounter >= authlimit ) );
2433 
2434  Log *log = DefaultEnv::GetLog();
2435  if( warn )
2436  log->Warning( XRootDMsg, sstrm.str().c_str() );
2437  else
2438  log->Debug( XRootDMsg, sstrm.str().c_str() );
2439  }
2440 
2441  // Read data from buffer
2442  //------------------------------------------------------------------------
2443  template<typename T>
2444  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, T& result )
2445  {
2446  if( sizeof( T ) > buflen ) return Status( stError, errDataError );
2447 
2448  memcpy(&result, buffer, sizeof(T));
2449 
2450  buffer += sizeof( T );
2451  buflen -= sizeof( T );
2452 
2453  return Status();
2454  }
2455 
2456  //------------------------------------------------------------------------
2457  // Read a string from buffer
2458  //------------------------------------------------------------------------
2459  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result )
2460  {
2461  Status status;
2462  char c = 0;
2463 
2464  while( true )
2465  {
2466  if( !( status = ReadFromBuffer( buffer, buflen, c ) ).IsOK() )
2467  return status;
2468 
2469  if( c == 0 ) break;
2470  result += c;
2471  }
2472 
2473  return status;
2474  }
2475 
2476  //------------------------------------------------------------------------
2477  // Read a string from buffer
2478  //------------------------------------------------------------------------
2479  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen,
2480  size_t size, std::string &result )
2481  {
2482  Status status;
2483 
2484  if( size > buflen ) return Status( stError, errDataError );
2485 
2486  result.append( buffer, size );
2487  buffer += size;
2488  buflen -= size;
2489 
2490  return status;
2491  }
2492 
2493 }
@ kXR_NotAuthorized
Definition: XProtocol.hh:998
@ kXR_NotFound
Definition: XProtocol.hh:999
@ kXR_FileLocked
Definition: XProtocol.hh:991
@ kXR_noReplicas
Definition: XProtocol.hh:1017
@ kXR_Unsupported
Definition: XProtocol.hh:1001
@ kXR_ServerError
Definition: XProtocol.hh:1000
@ kXR_Overloaded
Definition: XProtocol.hh:1012
@ kXR_ArgTooLong
Definition: XProtocol.hh:990
@ kXR_noserver
Definition: XProtocol.hh:1002
@ kXR_IOError
Definition: XProtocol.hh:995
@ kXR_FSError
Definition: XProtocol.hh:993
@ kXR_NoMemory
Definition: XProtocol.hh:996
#define kXR_isManager
Definition: XProtocol.hh:1154
union ServerResponse::@0 body
@ kXR_fattrDel
Definition: XProtocol.hh:270
@ kXR_fattrSet
Definition: XProtocol.hh:273
@ kXR_fattrList
Definition: XProtocol.hh:272
@ kXR_fattrGet
Definition: XProtocol.hh:271
struct ClientFattrRequest fattr
Definition: XProtocol.hh:852
#define kXR_collapseRedir
Definition: XProtocol.hh:1164
ServerResponseStatus status
Definition: XProtocol.hh:1306
#define kXR_attrMeta
Definition: XProtocol.hh:1156
kXR_char streamid[2]
Definition: XProtocol.hh:156
kXR_char streamid[2]
Definition: XProtocol.hh:912
kXR_unt16 options
Definition: XProtocol.hh:481
struct ClientDirlistRequest dirlist
Definition: XProtocol.hh:850
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_refresh
Definition: XProtocol.hh:459
@ kXR_new
Definition: XProtocol.hh:455
@ kXR_retstat
Definition: XProtocol.hh:463
struct ClientOpenRequest open
Definition: XProtocol.hh:858
@ kXR_waitresp
Definition: XProtocol.hh:904
@ kXR_redirect
Definition: XProtocol.hh:902
@ kXR_oksofar
Definition: XProtocol.hh:898
@ kXR_status
Definition: XProtocol.hh:905
@ kXR_ok
Definition: XProtocol.hh:897
@ kXR_attn
Definition: XProtocol.hh:899
@ kXR_wait
Definition: XProtocol.hh:903
@ kXR_error
Definition: XProtocol.hh:901
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1258
struct ClientRequestHdr header
Definition: XProtocol.hh:844
#define kXR_recoverWrts
Definition: XProtocol.hh:1163
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_mkdir
Definition: XProtocol.hh:120
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_chmod
Definition: XProtocol.hh:114
@ kXR_dirlist
Definition: XProtocol.hh:116
@ kXR_fattr
Definition: XProtocol.hh:132
@ kXR_rm
Definition: XProtocol.hh:126
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_set
Definition: XProtocol.hh:130
@ kXR_rmdir
Definition: XProtocol.hh:127
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_protocol
Definition: XProtocol.hh:118
@ kXR_mv
Definition: XProtocol.hh:121
@ kXR_ping
Definition: XProtocol.hh:123
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_locate
Definition: XProtocol.hh:139
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_pgwrite
Definition: XProtocol.hh:138
@ kXR_prepare
Definition: XProtocol.hh:133
#define kXR_isServer
Definition: XProtocol.hh:1155
#define kXR_attrVirtRdr
Definition: XProtocol.hh:1159
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:847
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1257
union ServerResponseV2::@1 info
#define kXR_PROTOCOLVERSION
Definition: XProtocol.hh:70
@ kXR_vfs
Definition: XProtocol.hh:761
struct ClientStatRequest stat
Definition: XProtocol.hh:871
kXR_char options
Definition: XProtocol.hh:767
#define kXR_ecRedir
Definition: XProtocol.hh:1165
struct ClientLocateRequest locate
Definition: XProtocol.hh:854
ServerResponseHeader hdr
Definition: XProtocol.hh:1284
long long kXR_int64
Definition: XPtypes.hh:98
int kXR_int32
Definition: XPtypes.hh:89
unsigned short kXR_unt16
Definition: XPtypes.hh:67
unsigned char kXR_char
Definition: XPtypes.hh:65
void Get(Type &object)
Retrieve the object being held.
Object for reading out data from the PgRead response.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition: XrdClBuffer.hh:156
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
uint32_t GetCursor() const
Get append cursor.
Definition: XrdClBuffer.hh:140
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition: XrdClBuffer.hh:189
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static bool HasStatInfo(const char *data)
Returns true if data contain stat info.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
virtual void Run(void *arg)
The job logic.
HandleRspJob(XrdCl::XRootDMsgHandler *handler)
Interface for a job to be run by the job manager.
void SetHostList(const HostList &hostList)
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:30
const std::string & GetDescription() const
Get the description of the message.
Definition: XrdClMessage.hh:95
uint64_t GetSessionId() const
Get the session ID the message is meant for.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
StreamEvent
Events that may have occurred to the stream.
@ Ready
The stream has become connected.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
A network socket.
Definition: XrdClSocket.hh:43
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
Definition: XrdClSocket.cc:461
bool IsEncrypted()
Definition: XrdClSocket.cc:867
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:94
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:451
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:165
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
bool FromString(const std::string &url)
Parse a string and fill the URL fields.
Definition: XrdClURL.cc:58
void SetPassword(const std::string &password)
Set the password.
Definition: XrdClURL.hh:156
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:113
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:388
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition: XrdClURL.cc:330
const std::string & GetUserName() const
Get the username.
Definition: XrdClURL.hh:130
const std::string & GetPassword() const
Get the password.
Definition: XrdClURL.hh:148
bool IsLocalFile() const
Definition: XrdClURL.cc:460
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:239
void SetProtocol(const std::string &protocol)
Set protocol.
Definition: XrdClURL.hh:121
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:438
void SetUserName(const std::string &userName)
Set the username.
Definition: XrdClURL.hh:138
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Definition: XrdClUtils.cc:700
Handle/Process/Forward XRootD messages.
virtual uint16_t Examine(std::shared_ptr< Message > &msg)
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
const Message * GetRequest() const
Get the request pointer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status)
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
virtual void OnStatusReady(const Message *message, XRootDStatus status)
The requested action has been performed and the status is available.
virtual bool IsRaw() const
Are we a raw writer or not?
virtual void Process()
Process the message if it was "taken" by the examine action.
virtual uint16_t InspectStatusRsp()
virtual uint16_t GetSid() const
const std::string & GetErrorMessage() const
Get error message.
static void SetDescription(Message *msg)
Get the description of a message.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errRedirectLimit
Definition: XrdClStatus.hh:102
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errNotFound
Definition: XrdClStatus.hh:100
const uint64_t XRootDMsg
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint64_t ExDbgMsg
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
const uint16_t errInvalidRedirectURL
Definition: XrdClStatus.hh:98
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
Buffer BinaryDataInfo
Binary buffer.
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const int DefaultNotAuthorizedRetryLimit
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errAuthFailed
Definition: XrdClStatus.hh:88
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85
none object for initializing empty Optional
XrdSysError Log
Definition: XrdConfig.cc:111
@ kXR_PartialResult
Definition: XProtocol.hh:1247
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
URL url
URL of the host.
uint32_t flags
Host type.
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version