XRootD
XrdClXRootDMsgHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClLog.hh"
27 #include "XrdCl/XrdClDefaultEnv.hh"
28 #include "XrdCl/XrdClConstants.hh"
30 #include "XrdCl/XrdClMessage.hh"
31 #include "XrdCl/XrdClURL.hh"
32 #include "XrdCl/XrdClUtils.hh"
34 #include "XrdCl/XrdClJobManager.hh"
35 #include "XrdCl/XrdClSIDManager.hh"
39 #include "XrdCl/XrdClSocket.hh"
40 #include "XrdCl/XrdClTls.hh"
41 #include "XrdCl/XrdClOptimizers.hh"
42 
43 #include "XrdOuc/XrdOucCRC.hh"
45 
46 #include "XrdSys/XrdSysPlatform.hh" // same as above
47 #include "XrdSys/XrdSysAtomics.hh"
48 #include "XrdSys/XrdSysPthread.hh"
49 #include <memory>
50 #include <sstream>
51 #include <numeric>
52 
53 namespace
54 {
55  //----------------------------------------------------------------------------
56  // We need an extra task what will run the handler in the future, because
57  // tasks get deleted and we need the handler
58  //----------------------------------------------------------------------------
59  class WaitTask: public XrdCl::Task
60  {
61  public:
62  WaitTask( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
63  {
64  std::ostringstream o;
65  o << "WaitTask for: 0x" << handler->GetRequest();
66  SetName( o.str() );
67  }
68 
69  virtual time_t Run( time_t now )
70  {
71  pHandler->WaitDone( now );
72  return 0;
73  }
74  private:
75  XrdCl::XRootDMsgHandler *pHandler;
76  };
77 }
78 
79 namespace XrdCl
80 {
81  //----------------------------------------------------------------------------
82  // Delegate the response handling to the thread-pool
83  //----------------------------------------------------------------------------
84  class HandleRspJob: public XrdCl::Job
85  {
86  public:
87  HandleRspJob( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
88  {
89 
90  }
91 
92  virtual ~HandleRspJob()
93  {
94 
95  }
96 
97  virtual void Run( void *arg )
98  {
99  pHandler->HandleResponse();
100  delete this;
101  }
102  private:
103  XrdCl::XRootDMsgHandler *pHandler;
104  };
105 
106  //----------------------------------------------------------------------------
107  // Examine an incoming message, and decide on the action to be taken
108  //----------------------------------------------------------------------------
109  uint16_t XRootDMsgHandler::Examine( std::shared_ptr<Message> &msg )
110  {
111  const int sst = pSendingState.fetch_or( kSawResp );
112 
113  if( !( sst & kSendDone ) && !( sst & kSawResp ) )
114  {
115  // we must have been sent although we haven't got the OnStatusReady
116  // notification yet. Set the inflight notice.
117 
118  Log *log = DefaultEnv::GetLog();
119  log->Dump( XRootDMsg, "[%s] Message %s reply received before notification "
120  "that it was sent, assuming it was sent ok.",
121  pUrl.GetHostId().c_str(),
122  pRequest->GetObfuscatedDescription().c_str() );
123 
124  pMsgInFly = true;
125  }
126 
127  //--------------------------------------------------------------------------
128  // if the MsgHandler is already being used to process another request
129  // (kXR_oksofar) we need to wait
130  //--------------------------------------------------------------------------
131  if( pOksofarAsAnswer )
132  {
133  XrdSysCondVarHelper lck( pCV );
134  while( pResponse ) pCV.Wait();
135  }
136  else
137  {
138  if( pResponse )
139  {
140  Log *log = DefaultEnv::GetLog();
141  log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
142  "it already owns a response: %p (message: %s ).",
143  pUrl.GetHostId().c_str(), this,
144  pRequest->GetObfuscatedDescription().c_str() );
145  }
146  }
147 
148  if( msg->GetSize() < 8 )
149  return Ignore;
150 
151  ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
152  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
153  uint16_t status = 0;
154  uint32_t dlen = 0;
155 
156  //--------------------------------------------------------------------------
157  // We only care about async responses, but those are extracted now
158  // in the SocketHandler.
159  //--------------------------------------------------------------------------
160  if( rsp->hdr.status == kXR_attn )
161  {
162  return Ignore;
163  }
164  //--------------------------------------------------------------------------
165  // We got a sync message - check if it belongs to us
166  //--------------------------------------------------------------------------
167  else
168  {
169  if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
170  rsp->hdr.streamid[1] != req->header.streamid[1] )
171  return Ignore;
172 
173  status = rsp->hdr.status;
174  dlen = rsp->hdr.dlen;
175  }
176 
177  //--------------------------------------------------------------------------
178  // We take the ownership of the message and decide what we will do
179  // with the handler itself, the options are:
180  // 1) we want to either read in raw mode (the Raw flag) or have the message
181  // body reconstructed for us by the TransportHandler by the time
182  // Process() is called (default, no extra flag)
183  // 2) we either got a full response in which case we don't want to be
184  // notified about anything anymore (RemoveHandler) or we got a partial
185  // answer and we need to wait for more (default, no extra flag)
186  //--------------------------------------------------------------------------
187  pResponse = msg;
188  pBodyReader->SetDataLength( dlen );
189 
190  Log *log = DefaultEnv::GetLog();
191  switch( status )
192  {
193  //------------------------------------------------------------------------
194  // Handle the cached cases
195  //------------------------------------------------------------------------
196  case kXR_error:
197  case kXR_redirect:
198  case kXR_wait:
199  return RemoveHandler;
200 
201  case kXR_waitresp:
202  {
203  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
204  "message %s", pUrl.GetHostId().c_str(),
205  pRequest->GetObfuscatedDescription().c_str() );
206 
207  pResponse.reset();
208  return Ignore; // This must be handled synchronously!
209  }
210 
211  //------------------------------------------------------------------------
212  // Handle the potential raw cases
213  //------------------------------------------------------------------------
214  case kXR_ok:
215  {
216  //----------------------------------------------------------------------
217  // For kXR_read we read in raw mode
218  //----------------------------------------------------------------------
219  uint16_t reqId = ntohs( req->header.requestid );
220  if( reqId == kXR_read )
221  {
222  return Raw | RemoveHandler;
223  }
224 
225  //----------------------------------------------------------------------
226  // kXR_readv is the same as kXR_read
227  //----------------------------------------------------------------------
228  if( reqId == kXR_readv )
229  {
230  return Raw | RemoveHandler;
231  }
232 
233  //----------------------------------------------------------------------
234  // For everything else we just take what we got
235  //----------------------------------------------------------------------
236  return RemoveHandler;
237  }
238 
239  //------------------------------------------------------------------------
240  // kXR_oksofars are special, they are not full responses, so we reset
241  // the response pointer to 0 and add the message to the partial list
242  //------------------------------------------------------------------------
243  case kXR_oksofar:
244  {
245  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
246  "%s", pUrl.GetHostId().c_str(),
247  pRequest->GetObfuscatedDescription().c_str() );
248 
249  if( !pOksofarAsAnswer )
250  {
251  pPartialResps.emplace_back( std::move( pResponse ) );
252  }
253 
254  //----------------------------------------------------------------------
255  // For kXR_read we either read in raw mode if the message has not
256  // been fully reconstructed already, if it has, we adjust
257  // the buffer offset to prepare for the next one
258  //----------------------------------------------------------------------
259  uint16_t reqId = ntohs( req->header.requestid );
260  if( reqId == kXR_read )
261  {
262  pTimeoutFence.store( true, std::memory_order_relaxed );
263  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
264  }
265 
266  //----------------------------------------------------------------------
267  // kXR_readv is similar to read, except that the payload is different
268  //----------------------------------------------------------------------
269  if( reqId == kXR_readv )
270  {
271  pTimeoutFence.store( true, std::memory_order_relaxed );
272  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
273  }
274 
275  return ( pOksofarAsAnswer ? None : NoProcess );
276  }
277 
278  case kXR_status:
279  {
280  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
281  "%s", pUrl.GetHostId().c_str(),
282  pRequest->GetObfuscatedDescription().c_str() );
283 
284  uint16_t reqId = ntohs( req->header.requestid );
285  if( reqId == kXR_pgwrite )
286  {
287  //--------------------------------------------------------------------
288  // In case of pgwrite by definition this wont be a partial response
289  // so we can already remove the handler from the in-queue
290  //--------------------------------------------------------------------
291  return RemoveHandler;
292  }
293 
294  //----------------------------------------------------------------------
295  // Otherwise (pgread), first of all we need to read the body of the
296  // kXR_status response, we can handle the raw data (if any) only after
297  // we have the whole kXR_status body
298  //----------------------------------------------------------------------
299  pTimeoutFence.store( true, std::memory_order_relaxed );
300  return None;
301  }
302 
303  //------------------------------------------------------------------------
304  // Default
305  //------------------------------------------------------------------------
306  default:
307  return RemoveHandler;
308  }
309  return RemoveHandler;
310  }
311 
312  //----------------------------------------------------------------------------
313  // Reexamine the incoming message, and decide on the action to be taken
314  //----------------------------------------------------------------------------
316  {
317  if( !pResponse )
318  return 0;
319 
320  Log *log = DefaultEnv::GetLog();
321  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
322 
323  //--------------------------------------------------------------------------
324  // Additional action is only required for kXR_status
325  //--------------------------------------------------------------------------
326  if( rsp->hdr.status != kXR_status ) return 0;
327 
328  //--------------------------------------------------------------------------
329  // Ignore malformed status response
330  //--------------------------------------------------------------------------
331  if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
332  {
333  log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
334  return Corrupted;
335  }
336 
337  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
338  uint16_t reqId = ntohs( req->header.requestid );
339  //--------------------------------------------------------------------------
340  // Unmarshal the status body
341  //--------------------------------------------------------------------------
342  XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
343 
344  if( !st.IsOK() && st.code == errDataError )
345  {
346  log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
347  st.GetErrorMessage().c_str() );
348  return Corrupted;
349  }
350 
351  if( !st.IsOK() )
352  {
353  log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
354  pUrl.GetHostId().c_str() );
355  pStatus = st;
356  HandleRspOrQueue();
357  return Ignore;
358  }
359 
360  //--------------------------------------------------------------------------
361  // Common handling for partial results
362  //--------------------------------------------------------------------------
363  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
365  {
366  pPartialResps.push_back( std::move( pResponse ) );
367  }
368 
369  //--------------------------------------------------------------------------
370  // Decide the actions that we need to take
371  //--------------------------------------------------------------------------
372  uint16_t action = 0;
373  if( reqId == kXR_pgread )
374  {
375  //----------------------------------------------------------------------
376  // The message contains only Status header and body but no raw data
377  //----------------------------------------------------------------------
378  if( !pPageReader )
379  pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
380  pPageReader->SetRsp( rspst );
381 
382  action |= Raw;
383 
385  action |= NoProcess;
386  else
387  action |= RemoveHandler;
388  }
389  else if( reqId == kXR_pgwrite )
390  {
391  // if data corruption has been detected on the server side we will
392  // send some additional data pointing to the pages that need to be
393  // retransmitted
394  if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
395  pResponse->GetCursor() )
396  action |= More;
397  }
398 
399  return action;
400  }
401 
402  //----------------------------------------------------------------------------
403  // Get handler sid
404  //----------------------------------------------------------------------------
405  uint16_t XRootDMsgHandler::GetSid() const
406  {
407  ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
408  return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
409  }
410 
411  //----------------------------------------------------------------------------
413  //----------------------------------------------------------------------------
415  {
416  Log *log = DefaultEnv::GetLog();
417 
418  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
419 
420  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
421 
422  //--------------------------------------------------------------------------
423  // If it is a local file, it can be only a metalink redirector
424  //--------------------------------------------------------------------------
425  if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
426  pHosts->back().protocol = kXR_PROTOCOLVERSION;
427 
428  //--------------------------------------------------------------------------
429  // We got an answer, check who we were talking to
430  //--------------------------------------------------------------------------
431  else
432  {
433  AnyObject qryResult;
434  int *qryResponse = 0;
435  pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
436  qryResult.Get( qryResponse );
437  pHosts->back().flags = *qryResponse; delete qryResponse; qryResponse = 0;
438  pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
439  qryResult.Get( qryResponse );
440  pHosts->back().protocol = *qryResponse; delete qryResponse;
441  }
442 
443  //--------------------------------------------------------------------------
444  // Process the message
445  //--------------------------------------------------------------------------
446  Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
447  if( !st.IsOK() )
448  {
449  pStatus = Status( stFatal, errInvalidMessage );
450  HandleResponse();
451  return;
452  }
453 
454  //--------------------------------------------------------------------------
455  // we have an response for the message so it's not in fly anymore
456  //--------------------------------------------------------------------------
457  pMsgInFly = false;
458 
459  //--------------------------------------------------------------------------
460  // Reset the aggregated wait (used to omit wait response in case of Metalink
461  // redirector)
462  //--------------------------------------------------------------------------
463  if( rsp->hdr.status != kXR_wait )
464  pAggregatedWaitTime = 0;
465 
466  switch( rsp->hdr.status )
467  {
468  //------------------------------------------------------------------------
469  // kXR_ok - we're done here
470  //------------------------------------------------------------------------
471  case kXR_ok:
472  {
473  log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
474  pUrl.GetHostId().c_str(),
475  pRequest->GetObfuscatedDescription().c_str() );
476  pStatus = Status();
477  HandleResponse();
478  return;
479  }
480 
481  case kXR_status:
482  {
483  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
484  pUrl.GetHostId().c_str(),
485  pRequest->GetObfuscatedDescription().c_str() );
486  pStatus = Status();
487  HandleResponse();
488  return;
489  }
490 
491  //------------------------------------------------------------------------
492  // kXR_ok - we're serving partial result to the user
493  //------------------------------------------------------------------------
494  case kXR_oksofar:
495  {
496  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
497  pUrl.GetHostId().c_str(),
498  pRequest->GetObfuscatedDescription().c_str() );
499  pStatus = Status( stOK, suContinue );
500  HandleResponse();
501  return;
502  }
503 
504  //------------------------------------------------------------------------
505  // kXR_error - we've got a problem
506  //------------------------------------------------------------------------
507  case kXR_error:
508  {
509  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
510  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
511  log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
512  "[%d] %s", pUrl.GetHostId().c_str(),
513  pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
514  errmsg );
515  delete [] errmsg;
516 
517  HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
518  return;
519  }
520 
521  //------------------------------------------------------------------------
522  // kXR_redirect - they tell us to go elsewhere
523  //------------------------------------------------------------------------
524  case kXR_redirect:
525  {
526  if( rsp->hdr.dlen <= 4 )
527  {
528  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
529  pUrl.GetHostId().c_str() );
530  pStatus = Status( stError, errInvalidResponse );
531  HandleResponse();
532  return;
533  }
534 
535  char *urlInfoBuff = new char[rsp->hdr.dlen-3];
536  urlInfoBuff[rsp->hdr.dlen-4] = 0;
537  memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
538  std::string urlInfo = urlInfoBuff;
539  delete [] urlInfoBuff;
540  log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
541  "message %s: %s, port %d", pUrl.GetHostId().c_str(),
542  pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
543  rsp->body.redirect.port );
544 
545  //----------------------------------------------------------------------
546  // Check if we can proceed
547  //----------------------------------------------------------------------
548  if( !pRedirectCounter )
549  {
550  log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
551  "message %s, the last known error is: %s",
552  pUrl.GetHostId().c_str(),
553  pRequest->GetObfuscatedDescription().c_str(),
554  pLastError.ToString().c_str() );
555 
556 
557  pStatus = Status( stFatal, errRedirectLimit );
558  HandleResponse();
559  return;
560  }
561  --pRedirectCounter;
562 
563  //----------------------------------------------------------------------
564  // Keep the info about this server if we still need to find a load
565  // balancer
566  //----------------------------------------------------------------------
567  uint32_t flags = pHosts->back().flags;
568  if( !pHasLoadBalancer )
569  {
570  if( flags & kXR_isManager )
571  {
572  //------------------------------------------------------------------
573  // If the current server is a meta manager then it supersedes
574  // any existing load balancer, otherwise we assign a load-balancer
575  // only if it has not been already assigned
576  //------------------------------------------------------------------
577  if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
578  {
579  pLoadBalancer = pHosts->back();
580  log->Dump( XRootDMsg, "[%s] Current server has been assigned "
581  "as a load-balancer for message %s",
582  pUrl.GetHostId().c_str(),
583  pRequest->GetObfuscatedDescription().c_str() );
584  HostList::iterator it;
585  for( it = pHosts->begin(); it != pHosts->end(); ++it )
586  it->loadBalancer = false;
587  pHosts->back().loadBalancer = true;
588  }
589  }
590  }
591 
592  //----------------------------------------------------------------------
593  // If the redirect comes from a data server safe the URL because
594  // in case of a failure we will use it as the effective data server URL
595  // for the tried CGI opaque info
596  //----------------------------------------------------------------------
597  if( flags & kXR_isServer )
598  pEffectiveDataServerUrl = new URL( pHosts->back().url );
599 
600  //----------------------------------------------------------------------
601  // Build the URL and check it's validity
602  //----------------------------------------------------------------------
603  std::vector<std::string> urlComponents;
604  std::string newCgi;
605  Utils::splitString( urlComponents, urlInfo, "?" );
606 
607  std::ostringstream o;
608 
609  o << urlComponents[0];
610  if( rsp->body.redirect.port > 0 )
611  o << ":" << rsp->body.redirect.port << "/";
612  else if( rsp->body.redirect.port < 0 )
613  {
614  //--------------------------------------------------------------------
615  // check if the manager wants to enforce write recovery at himself
616  // (beware we are dealing here with negative flags)
617  //--------------------------------------------------------------------
618  if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
619  pHosts->back().flags |= kXR_recoverWrts;
620 
621  //--------------------------------------------------------------------
622  // check if the manager wants to collapse the communication channel
623  // (the redirect host is to replace the current host)
624  //--------------------------------------------------------------------
625  if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
626  {
627  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
628  pPostMaster->CollapseRedirect( pUrl, url );
629  }
630 
631  if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
632  {
633  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
634  if( Utils::CheckEC( pRequest, url ) )
635  pRedirectAsAnswer = true;
636  }
637  }
638 
639  URL newUrl = URL( o.str() );
640  if( !newUrl.IsValid() )
641  {
642  pStatus = Status( stError, errInvalidRedirectURL );
643  log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
644  pUrl.GetHostId().c_str(), urlInfo.c_str() );
645  HandleResponse();
646  return;
647  }
648 
649  if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
650  newUrl.SetUserName( pUrl.GetUserName() );
651 
652  if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
653  newUrl.SetPassword( pUrl.GetPassword() );
654 
655  //----------------------------------------------------------------------
656  // Forward any "xrd.*" params from the original client request also to
657  // the new redirection url
658  // Also, we need to preserve any "xrdcl.*' as they are important for
659  // our internal workflows.
660  //----------------------------------------------------------------------
661  std::ostringstream ossXrd;
662  const URL::ParamsMap &urlParams = pUrl.GetParams();
663 
664  for(URL::ParamsMap::const_iterator it = urlParams.begin();
665  it != urlParams.end(); ++it )
666  {
667  if( it->first.compare( 0, 4, "xrd." ) &&
668  it->first.compare( 0, 6, "xrdcl." ) )
669  continue;
670 
671  ossXrd << it->first << '=' << it->second << '&';
672  }
673 
674  std::string xrdCgi = ossXrd.str();
675  pRedirectUrl = newUrl.GetURL();
676 
677  URL cgiURL;
678  if( urlComponents.size() > 1 )
679  {
680  pRedirectUrl += "?";
681  pRedirectUrl += urlComponents[1];
682  std::ostringstream o;
683  o << "fake://fake:111//fake?";
684  o << urlComponents[1];
685 
686  if( urlComponents.size() == 3 )
687  o << '?' << urlComponents[2];
688 
689  if (!xrdCgi.empty())
690  {
691  o << '&' << xrdCgi;
692  pRedirectUrl += '&';
693  pRedirectUrl += xrdCgi;
694  }
695 
696  cgiURL = URL( o.str() );
697  }
698  else {
699  if (!xrdCgi.empty())
700  {
701  std::ostringstream o;
702  o << "fake://fake:111//fake?";
703  o << xrdCgi;
704  cgiURL = URL( o.str() );
705  pRedirectUrl += '?';
706  pRedirectUrl += xrdCgi;
707  }
708  }
709 
710  //----------------------------------------------------------------------
711  // Check if we need to return the URL as a response
712  //----------------------------------------------------------------------
713  if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
714  newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
715  !newUrl.IsLocalFile() )
716  pRedirectAsAnswer = true;
717 
718  if( pRedirectAsAnswer )
719  {
720  pStatus = Status( stError, errRedirect );
721  HandleResponse();
722  return;
723  }
724 
725  //----------------------------------------------------------------------
726  // Rewrite the message in a way required to send it to another server
727  //----------------------------------------------------------------------
728  newUrl.SetParams( cgiURL.GetParams() );
729  Status st = RewriteRequestRedirect( newUrl );
730  if( !st.IsOK() )
731  {
732  pStatus = st;
733  HandleResponse();
734  return;
735  }
736 
737  //----------------------------------------------------------------------
738  // Make sure we don't change the protocol by accident (root vs roots)
739  //----------------------------------------------------------------------
740  if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
741  ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
742  newUrl.SetProtocol( "roots" );
743 
744  //----------------------------------------------------------------------
745  // Send the request to the new location
746  //----------------------------------------------------------------------
747  HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
748  return;
749  }
750 
751  //------------------------------------------------------------------------
752  // kXR_wait - we wait, and re-issue the request later
753  //------------------------------------------------------------------------
754  case kXR_wait:
755  {
756  uint32_t waitSeconds = 0;
757 
758  if( rsp->hdr.dlen >= 4 )
759  {
760  char *infoMsg = new char[rsp->hdr.dlen-3];
761  infoMsg[rsp->hdr.dlen-4] = 0;
762  memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
763  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
764  "message %s: %s", pUrl.GetHostId().c_str(),
765  rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
766  infoMsg );
767  delete [] infoMsg;
768  waitSeconds = rsp->body.wait.seconds;
769  }
770  else
771  {
772  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
773  "message %s", pUrl.GetHostId().c_str(),
774  pRequest->GetObfuscatedDescription().c_str() );
775  }
776 
777  pAggregatedWaitTime += waitSeconds;
778 
779  // We need a special case if the data node comes from metalink
780  // redirector. In this case it might make more sense to try the
781  // next entry in the Metalink than wait.
782  if( OmitWait( *pRequest, pLoadBalancer.url ) )
783  {
784  int maxWait = DefaultMaxMetalinkWait;
785  DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
786  if( pAggregatedWaitTime > maxWait )
787  {
788  UpdateTriedCGI();
789  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
790  return;
791  }
792  }
793 
794  //----------------------------------------------------------------------
795  // Some messages require rewriting before they can be sent again
796  // after wait
797  //----------------------------------------------------------------------
798  Status st = RewriteRequestWait();
799  if( !st.IsOK() )
800  {
801  pStatus = st;
802  HandleResponse();
803  return;
804  }
805 
806  //----------------------------------------------------------------------
807  // Register a task to resend the message in some seconds, if we still
808  // have time to do that, and report a timeout otherwise
809  //----------------------------------------------------------------------
810  time_t resendTime = ::time(0)+waitSeconds;
811 
812  if( resendTime < pExpiration )
813  {
814  log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
815  pUrl.GetHostId().c_str(), this,
816  pRequest->GetObfuscatedDescription().c_str() );
817 
818  TaskManager *taskMgr = pPostMaster->GetTaskManager();
819  taskMgr->RegisterTask( new WaitTask( this ), resendTime );
820  }
821  else
822  {
823  log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
824  pUrl.GetHostId().c_str(),
825  pRequest->GetObfuscatedDescription().c_str() );
826  HandleError( Status( stError, errOperationExpired) );
827  }
828  return;
829  }
830 
831  //------------------------------------------------------------------------
832  // kXR_waitresp - the response will be returned in some seconds as an
833  // unsolicited message. Currently all messages of this type are handled
834  // one step before in the XrdClStream::OnIncoming as they need to be
835  // processed synchronously.
836  //------------------------------------------------------------------------
837  case kXR_waitresp:
838  {
839  if( rsp->hdr.dlen < 4 )
840  {
841  log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
842  pUrl.GetHostId().c_str() );
843  pStatus = Status( stError, errInvalidResponse );
844  HandleResponse();
845  return;
846  }
847 
848  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
849  "message %s", pUrl.GetHostId().c_str(),
850  rsp->body.waitresp.seconds,
851  pRequest->GetObfuscatedDescription().c_str() );
852  return;
853  }
854 
855  //------------------------------------------------------------------------
856  // Default - unrecognized/unsupported response, declare an error
857  //------------------------------------------------------------------------
858  default:
859  {
860  log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
861  "message %s", pUrl.GetHostId().c_str(),
862  rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
863  pStatus = Status( stError, errInvalidResponse );
864  HandleResponse();
865  return;
866  }
867  }
868 
869  return;
870  }
871 
872  //----------------------------------------------------------------------------
873  // Handle an event other that a message arrival - may be timeout
874  //----------------------------------------------------------------------------
876  XRootDStatus status )
877  {
878  Log *log = DefaultEnv::GetLog();
879  log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
880  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
881 
882  if( event == Ready )
883  return 0;
884 
885  if( pTimeoutFence.load( std::memory_order_relaxed ) )
886  return 0;
887 
888  HandleError( status );
889  return RemoveHandler;
890  }
891 
892  //----------------------------------------------------------------------------
893  // Read message body directly from a socket
894  //----------------------------------------------------------------------------
896  Socket *socket,
897  uint32_t &bytesRead )
898  {
899  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
900  uint16_t reqId = ntohs( req->header.requestid );
901 
902  if( reqId == kXR_pgread )
903  return pPageReader->Read( *socket, bytesRead );
904 
905  return pBodyReader->Read( *socket, bytesRead );
906  }
907 
908  //----------------------------------------------------------------------------
909  // We're here when we requested sending something over the wire
910  // and there has been a status update on this action
911  //----------------------------------------------------------------------------
913  XRootDStatus status )
914  {
915  Log *log = DefaultEnv::GetLog();
916 
917  const int sst = pSendingState.fetch_or( kSendDone );
918 
919  if( sst & kFinalResp )
920  {
921  log->Dump( XRootDMsg, "[%s] Got late notification that outgoing message %s was "
922  "sent, already have final response, queuing handler callback.",
923  pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
924  HandleRspOrQueue();
925  return;
926  }
927 
928  if( sst & kSawResp )
929  {
930  log->Dump( XRootDMsg, "[%s] Got late notification that message %s has "
931  "been successfully sent.",
932  pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
933  return;
934  }
935 
936  //--------------------------------------------------------------------------
937  // We were successful, so we now need to listen for a response
938  //--------------------------------------------------------------------------
939  if( status.IsOK() )
940  {
941  log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
942  pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
943 
944  pMsgInFly = true;
945  return;
946  }
947 
948  //--------------------------------------------------------------------------
949  // We have failed, recover if possible
950  //--------------------------------------------------------------------------
951  log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
952  "recover.", pUrl.GetHostId().c_str(),
953  message->GetObfuscatedDescription().c_str() );
954  HandleError( status );
955  }
956 
957  //----------------------------------------------------------------------------
958  // Are we a raw writer or not?
959  //----------------------------------------------------------------------------
961  {
962  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
963  uint16_t reqId = ntohs( req->header.requestid );
964  if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
965  return true;
966  // checkpoint + execute
967  if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
968  {
969  ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
970  reqId = ntohs( xeq->header.requestid );
971  return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
972  }
973 
974  return false;
975  }
976 
977  //----------------------------------------------------------------------------
978  // Write the message body
979  //----------------------------------------------------------------------------
981  uint32_t &bytesWritten )
982  {
983  //--------------------------------------------------------------------------
984  // First check if it is a PgWrite
985  //--------------------------------------------------------------------------
986  if( !pChunkList->empty() && !pCrc32cDigests.empty() )
987  {
988  //------------------------------------------------------------------------
989  // PgWrite will have just one chunk
990  //------------------------------------------------------------------------
991  ChunkInfo chunk = pChunkList->front();
992  //------------------------------------------------------------------------
993  // Calculate the size of the first and last page (in case the chunk is not
994  // 4KB aligned)
995  //------------------------------------------------------------------------
996  int fLen = 0, lLen = 0;
997  size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
998 
999  //------------------------------------------------------------------------
1000  // Set the crc32c buffer if not ready yet
1001  //------------------------------------------------------------------------
1002  if( pPgWrtCksumBuff.GetCursor() == 0 )
1003  {
1004  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1005  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1006  }
1007 
1008  uint32_t btsLeft = chunk.length - pAsyncOffset;
1009  uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
1010  if( pglen > btsLeft ) pglen = btsLeft;
1011  char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
1012 
1013  while( btsLeft > 0 )
1014  {
1015  // first write the crc32c digest
1016  while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
1017  {
1018  uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
1019  char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
1020  int btswrt = 0;
1021  Status st = socket->Send( dgstbuf, dgstlen, btswrt );
1022  if( !st.IsOK() ) return st;
1023  bytesWritten += btswrt;
1024  pPgWrtCksumBuff.AdvanceCursor( btswrt );
1025  if( st.code == suRetry ) return st;
1026  }
1027  // then write the raw data (one page)
1028  int btswrt = 0;
1029  Status st = socket->Send( pgbuf, pglen, btswrt );
1030  if( !st.IsOK() ) return st;
1031  pgbuf += btswrt;
1032  pglen -= btswrt;
1033  btsLeft -= btswrt;
1034  bytesWritten += btswrt;
1035  pAsyncOffset += btswrt; // update the offset to the raw data
1036  if( st.code == suRetry ) return st;
1037  // if we managed to write all the data ...
1038  if( pglen == 0 )
1039  {
1040  // move to the next page
1041  ++pPgWrtCurrentPageNb;
1042  if( pPgWrtCurrentPageNb < nbpgs )
1043  {
1044  // set the digest buffer
1045  pPgWrtCksumBuff.SetCursor( 0 );
1046  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1047  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1048  }
1049  // set the page length
1050  pglen = XrdSys::PageSize;
1051  if( pglen > btsLeft ) pglen = btsLeft;
1052  // reset offset in the current page
1053  pPgWrtCurrentPageOffset = 0;
1054  }
1055  else
1056  // otherwise just adjust the offset in the current page
1057  pPgWrtCurrentPageOffset += btswrt;
1058 
1059  }
1060  }
1061  else if( !pChunkList->empty() )
1062  {
1063  size_t size = pChunkList->size();
1064  for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1065  {
1066  char *buffer = (char*)(*pChunkList)[i].buffer;
1067  uint32_t size = (*pChunkList)[i].length;
1068  size_t leftToBeWritten = size - pAsyncOffset;
1069 
1070  while( leftToBeWritten )
1071  {
1072  int btswrt = 0;
1073  Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1074  bytesWritten += btswrt;
1075  if( !st.IsOK() || st.code == suRetry ) return st;
1076  pAsyncOffset += btswrt;
1077  leftToBeWritten -= btswrt;
1078  }
1079  //----------------------------------------------------------------------
1080  // Remember that we have moved to the next chunk, also clear the offset
1081  // within the buffer as we are going to move to a new one
1082  //----------------------------------------------------------------------
1083  ++pAsyncChunkIndex;
1084  pAsyncOffset = 0;
1085  }
1086  }
1087  else
1088  {
1089  Log *log = DefaultEnv::GetLog();
1090 
1091  //------------------------------------------------------------------------
1092  // If the socket is encrypted we cannot use a kernel buffer, we have to
1093  // convert to user space buffer
1094  //------------------------------------------------------------------------
1095  if( socket->IsEncrypted() )
1096  {
1097  log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1098  pUrl.GetHostId().c_str() );
1099 
1100  char *ubuff = 0;
1101  ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1102  if( ret < 0 ) return Status( stError, errInternal );
1103  pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1104  return WriteMessageBody( socket, bytesWritten );
1105  }
1106 
1107  //------------------------------------------------------------------------
1108  // Send the data
1109  //------------------------------------------------------------------------
1110  while( !pKBuff->Empty() )
1111  {
1112  int btswrt = 0;
1113  Status st = socket->Send( *pKBuff, btswrt );
1114  bytesWritten += btswrt;
1115  if( !st.IsOK() || st.code == suRetry ) return st;
1116  }
1117 
1118  log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1119  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1120  }
1121 
1122  return Status();
1123  }
1124 
1125  //----------------------------------------------------------------------------
1126  // We're here when we got a time event. We needed to re-issue the request
1127  // in some time in the future, and that moment has arrived
1128  //----------------------------------------------------------------------------
1130  {
1131  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1132  }
1133 
1134  //----------------------------------------------------------------------------
1135  // Bookkeeping after partial response has been received.
1136  //----------------------------------------------------------------------------
1138  {
1139  pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1140  }
1141 
1142  //----------------------------------------------------------------------------
1143  // Unpack the message and call the response handler
1144  //----------------------------------------------------------------------------
1145  void XRootDMsgHandler::HandleResponse()
1146  {
1147  //--------------------------------------------------------------------------
1148  // Is it a final response?
1149  //--------------------------------------------------------------------------
1150  bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
1151  if( finalrsp )
1152  {
1153  // Do not do final processing of the response if we haven't had
1154  // confirmation the original request was sent (via OnStatusReady).
1155  // The final processing will be triggered when we get the confirm.
1156  const int sst = pSendingState.fetch_or( kFinalResp );
1157  if( !( sst & kSendDone ) )
1158  return;
1159  }
1160 
1161  //--------------------------------------------------------------------------
1162  // Process the response and notify the listener
1163  //--------------------------------------------------------------------------
1165  XRootDStatus *status = ProcessStatus();
1166  AnyObject *response = 0;
1167 
1168  Log *log = DefaultEnv::GetLog();
1169  log->Debug( ExDbgMsg, "[%s] Calling MsgHandler: %p (message: %s ) "
1170  "with status: %s.",
1171  pUrl.GetHostId().c_str(), this,
1172  pRequest->GetObfuscatedDescription().c_str(),
1173  status->ToString().c_str() );
1174 
1175  if( status->IsOK() )
1176  {
1177  Status st = ParseResponse( response );
1178  if( !st.IsOK() )
1179  {
1180  delete status;
1181  delete response;
1182  status = new XRootDStatus( st );
1183  response = 0;
1184  }
1185  }
1186 
1187  //--------------------------------------------------------------------------
1188  // Close the redirect entry if necessary
1189  //--------------------------------------------------------------------------
1190  if( pRdirEntry )
1191  {
1192  pRdirEntry->status = *status;
1193  pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
1194  }
1195 
1196  //--------------------------------------------------------------------------
1197  // Release the stream id
1198  //--------------------------------------------------------------------------
1199  if( pSidMgr && finalrsp )
1200  {
1201  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1202  if( status->IsOK() || !pMsgInFly ||
1203  !( status->code == errOperationExpired || status->code == errOperationInterrupted ) )
1204  pSidMgr->ReleaseSID( req->header.streamid );
1205  }
1206 
1207  HostList *hosts = pHosts.release();
1208  if( !finalrsp )
1209  pHosts.reset( new HostList( *hosts ) );
1210 
1211  pResponseHandler->HandleResponseWithHosts( status, response, hosts );
1212 
1213  //--------------------------------------------------------------------------
1214  // if it is the final response there is nothing more to do ...
1215  //--------------------------------------------------------------------------
1216  if( finalrsp )
1217  delete this;
1218  //--------------------------------------------------------------------------
1219  // on the other hand if it is not the final response, we have to keep the
1220  // MsgHandler and delete the current response
1221  //--------------------------------------------------------------------------
1222  else
1223  {
1224  XrdSysCondVarHelper lck( pCV );
1225  pResponse.reset();
1226  pTimeoutFence.store( false, std::memory_order_relaxed );
1227  pCV.Broadcast();
1228  }
1229  }
1230 
1231 
1232  //----------------------------------------------------------------------------
1233  // Extract the status information from the stuff that we got
1234  //----------------------------------------------------------------------------
1235  XRootDStatus *XRootDMsgHandler::ProcessStatus()
1236  {
1237  XRootDStatus *st = new XRootDStatus( pStatus );
1238  ServerResponse *rsp = 0;
1239  if( pResponse )
1240  rsp = (ServerResponse *)pResponse->GetBuffer();
1241 
1242  if( !pStatus.IsOK() && rsp )
1243  {
1244  if( pStatus.code == errErrorResponse )
1245  {
1246  st->errNo = rsp->body.error.errnum;
1247  // omit the last character as the string returned from the server
1248  // (acording to protocol specs) should be null-terminated
1249  std::string errmsg( rsp->body.error.errmsg, rsp->hdr.dlen-5 );
1250  if( st->errNo == kXR_noReplicas && !pLastError.IsOK() )
1251  errmsg += " Last seen error: " + pLastError.ToString();
1252  st->SetErrorMessage( errmsg );
1253  }
1254  else if( pStatus.code == errRedirect )
1255  st->SetErrorMessage( pRedirectUrl );
1256  }
1257  return st;
1258  }
1259 
1260  //------------------------------------------------------------------------
1261  // Parse the response and put it in an object that could be passed to
1262  // the user
1263  //------------------------------------------------------------------------
1264  Status XRootDMsgHandler::ParseResponse( AnyObject *&response )
1265  {
1266  if( !pResponse )
1267  return Status();
1268 
1269  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
1270  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1271  Log *log = DefaultEnv::GetLog();
1272 
1273  //--------------------------------------------------------------------------
1274  // Handle redirect as an answer
1275  //--------------------------------------------------------------------------
1276  if( rsp->hdr.status == kXR_redirect )
1277  {
1278  log->Error( XRootDMsg, "Internal Error: unable to process redirect" );
1279  return 0;
1280  }
1281 
1282  Buffer buff;
1283  uint32_t length = 0;
1284  char *buffer = 0;
1285 
1286  //--------------------------------------------------------------------------
1287  // We don't have any partial answers so pass what we have
1288  //--------------------------------------------------------------------------
1289  if( pPartialResps.empty() )
1290  {
1291  buffer = rsp->body.buffer.data;
1292  length = rsp->hdr.dlen;
1293  }
1294  //--------------------------------------------------------------------------
1295  // Partial answers, we need to glue them together before parsing
1296  //--------------------------------------------------------------------------
1297  else if( req->header.requestid != kXR_read &&
1298  req->header.requestid != kXR_readv )
1299  {
1300  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1301  {
1302  ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1303  length += part->hdr.dlen;
1304  }
1305  length += rsp->hdr.dlen;
1306 
1307  buff.Allocate( length );
1308  uint32_t offset = 0;
1309  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1310  {
1311  ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1312  buff.Append( part->body.buffer.data, part->hdr.dlen, offset );
1313  offset += part->hdr.dlen;
1314  }
1315  buff.Append( rsp->body.buffer.data, rsp->hdr.dlen, offset );
1316  buffer = buff.GetBuffer();
1317  }
1318 
1319  //--------------------------------------------------------------------------
1320  // Right, but what was the question?
1321  //--------------------------------------------------------------------------
1322  switch( req->header.requestid )
1323  {
1324  //------------------------------------------------------------------------
1325  // kXR_mv, kXR_truncate, kXR_rm, kXR_mkdir, kXR_rmdir, kXR_chmod,
1326  // kXR_ping, kXR_close, kXR_write, kXR_sync
1327  //------------------------------------------------------------------------
1328  case kXR_mv:
1329  case kXR_truncate:
1330  case kXR_rm:
1331  case kXR_mkdir:
1332  case kXR_rmdir:
1333  case kXR_chmod:
1334  case kXR_ping:
1335  case kXR_close:
1336  case kXR_write:
1337  case kXR_writev:
1338  case kXR_sync:
1339  case kXR_chkpoint:
1340  return Status();
1341 
1342  //------------------------------------------------------------------------
1343  // kXR_locate
1344  //------------------------------------------------------------------------
1345  case kXR_locate:
1346  {
1347  AnyObject *obj = new AnyObject();
1348 
1349  char *nullBuffer = new char[length+1];
1350  nullBuffer[length] = 0;
1351  memcpy( nullBuffer, buffer, length );
1352 
1353  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1354  "LocateInfo: %s", pUrl.GetHostId().c_str(),
1355  pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1356  LocationInfo *data = new LocationInfo();
1357 
1358  if( data->ParseServerResponse( nullBuffer ) == false )
1359  {
1360  delete obj;
1361  delete data;
1362  delete [] nullBuffer;
1363  return Status( stError, errInvalidResponse );
1364  }
1365  delete [] nullBuffer;
1366 
1367  obj->Set( data );
1368  response = obj;
1369  return Status();
1370  }
1371 
1372  //------------------------------------------------------------------------
1373  // kXR_stat
1374  //------------------------------------------------------------------------
1375  case kXR_stat:
1376  {
1377  AnyObject *obj = new AnyObject();
1378 
1379  //----------------------------------------------------------------------
1380  // Virtual File System stat (kXR_vfs)
1381  //----------------------------------------------------------------------
1382  if( req->stat.options & kXR_vfs )
1383  {
1384  StatInfoVFS *data = new StatInfoVFS();
1385 
1386  char *nullBuffer = new char[length+1];
1387  nullBuffer[length] = 0;
1388  memcpy( nullBuffer, buffer, length );
1389 
1390  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1391  "StatInfoVFS: %s", pUrl.GetHostId().c_str(),
1392  pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1393 
1394  if( data->ParseServerResponse( nullBuffer ) == false )
1395  {
1396  delete obj;
1397  delete data;
1398  delete [] nullBuffer;
1399  return Status( stError, errInvalidResponse );
1400  }
1401  delete [] nullBuffer;
1402 
1403  obj->Set( data );
1404  }
1405  //----------------------------------------------------------------------
1406  // Normal stat
1407  //----------------------------------------------------------------------
1408  else
1409  {
1410  StatInfo *data = new StatInfo();
1411 
1412  char *nullBuffer = new char[length+1];
1413  nullBuffer[length] = 0;
1414  memcpy( nullBuffer, buffer, length );
1415 
1416  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as StatInfo: "
1417  "%s", pUrl.GetHostId().c_str(),
1418  pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1419 
1420  if( data->ParseServerResponse( nullBuffer ) == false )
1421  {
1422  delete obj;
1423  delete data;
1424  delete [] nullBuffer;
1425  return Status( stError, errInvalidResponse );
1426  }
1427  delete [] nullBuffer;
1428  obj->Set( data );
1429  }
1430 
1431  response = obj;
1432  return Status();
1433  }
1434 
1435  //------------------------------------------------------------------------
1436  // kXR_protocol
1437  //------------------------------------------------------------------------
1438  case kXR_protocol:
1439  {
1440  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ProtocolInfo",
1441  pUrl.GetHostId().c_str(),
1442  pRequest->GetObfuscatedDescription().c_str() );
1443 
1444  if( rsp->hdr.dlen < 8 )
1445  {
1446  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
1447  pUrl.GetHostId().c_str() );
1448  return Status( stError, errInvalidResponse );
1449  }
1450 
1451  AnyObject *obj = new AnyObject();
1452  ProtocolInfo *data = new ProtocolInfo( rsp->body.protocol.pval,
1453  rsp->body.protocol.flags );
1454  obj->Set( data );
1455  response = obj;
1456  return Status();
1457  }
1458 
1459  //------------------------------------------------------------------------
1460  // kXR_dirlist
1461  //------------------------------------------------------------------------
1462  case kXR_dirlist:
1463  {
1464  AnyObject *obj = new AnyObject();
1465  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1466  "DirectoryList", pUrl.GetHostId().c_str(),
1467  pRequest->GetObfuscatedDescription().c_str() );
1468 
1469  char *path = new char[req->dirlist.dlen+1];
1470  path[req->dirlist.dlen] = 0;
1471  memcpy( path, pRequest->GetBuffer(24), req->dirlist.dlen );
1472 
1473  DirectoryList *data = new DirectoryList();
1474  data->SetParentName( path );
1475  delete [] path;
1476 
1477  char *nullBuffer = new char[length+1];
1478  nullBuffer[length] = 0;
1479  memcpy( nullBuffer, buffer, length );
1480 
1481  bool invalidrsp = false;
1482 
1483  if( !pDirListStarted )
1484  {
1485  pDirListWithStat = DirectoryList::HasStatInfo( nullBuffer );
1486  pDirListStarted = true;
1487 
1488  invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer );
1489  }
1490  else
1491  invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer, pDirListWithStat );
1492 
1493  if( invalidrsp )
1494  {
1495  delete data;
1496  delete obj;
1497  delete [] nullBuffer;
1498  return Status( stError, errInvalidResponse );
1499  }
1500 
1501  delete [] nullBuffer;
1502  obj->Set( data );
1503  response = obj;
1504  return Status();
1505  }
1506 
1507  //------------------------------------------------------------------------
1508  // kXR_open - if we got the statistics, otherwise return 0
1509  //------------------------------------------------------------------------
1510  case kXR_open:
1511  {
1512  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as OpenInfo",
1513  pUrl.GetHostId().c_str(),
1514  pRequest->GetObfuscatedDescription().c_str() );
1515 
1516  if( rsp->hdr.dlen < 4 )
1517  {
1518  log->Error( XRootDMsg, "[%s] Got invalid open response.",
1519  pUrl.GetHostId().c_str() );
1520  return Status( stError, errInvalidResponse );
1521  }
1522 
1523  AnyObject *obj = new AnyObject();
1524  StatInfo *statInfo = 0;
1525 
1526  //----------------------------------------------------------------------
1527  // Handle StatInfo if requested
1528  //----------------------------------------------------------------------
1529  if( req->open.options & kXR_retstat )
1530  {
1531  log->Dump( XRootDMsg, "[%s] Parsing StatInfo in response to %s",
1532  pUrl.GetHostId().c_str(),
1533  pRequest->GetObfuscatedDescription().c_str() );
1534 
1535  if( rsp->hdr.dlen >= 12 )
1536  {
1537  char *nullBuffer = new char[rsp->hdr.dlen-11];
1538  nullBuffer[rsp->hdr.dlen-12] = 0;
1539  memcpy( nullBuffer, buffer+12, rsp->hdr.dlen-12 );
1540 
1541  statInfo = new StatInfo();
1542  if( statInfo->ParseServerResponse( nullBuffer ) == false )
1543  {
1544  delete statInfo;
1545  statInfo = 0;
1546  }
1547  delete [] nullBuffer;
1548  }
1549 
1550  if( rsp->hdr.dlen < 12 || !statInfo )
1551  {
1552  log->Error( XRootDMsg, "[%s] Unable to parse StatInfo in response "
1553  "to %s", pUrl.GetHostId().c_str(),
1554  pRequest->GetObfuscatedDescription().c_str() );
1555  delete obj;
1556  return Status( stError, errInvalidResponse );
1557  }
1558  }
1559 
1560  OpenInfo *data = new OpenInfo( (uint8_t*)buffer,
1561  pResponse->GetSessionId(),
1562  statInfo );
1563  obj->Set( data );
1564  response = obj;
1565  return Status();
1566  }
1567 
1568  //------------------------------------------------------------------------
1569  // kXR_read
1570  //------------------------------------------------------------------------
1571  case kXR_read:
1572  {
1573  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ChunkInfo",
1574  pUrl.GetHostId().c_str(),
1575  pRequest->GetObfuscatedDescription().c_str() );
1576 
1577  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1578  {
1579  //--------------------------------------------------------------------
1580  // we are expecting to have only the header in the message, the raw
1581  // data have been readout into the user buffer
1582  //--------------------------------------------------------------------
1583  if( pPartialResps[i]->GetSize() > 8 )
1584  return Status( stOK, errInternal );
1585  }
1586  //----------------------------------------------------------------------
1587  // we are expecting to have only the header in the message, the raw
1588  // data have been readout into the user buffer
1589  //----------------------------------------------------------------------
1590  if( pResponse->GetSize() > 8 )
1591  return Status( stOK, errInternal );
1592  //----------------------------------------------------------------------
1593  // Get the response for the end user
1594  //----------------------------------------------------------------------
1595  return pBodyReader->GetResponse( response );
1596  }
1597 
1598  //------------------------------------------------------------------------
1599  // kXR_pgread
1600  //------------------------------------------------------------------------
1601  case kXR_pgread:
1602  {
1603  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as PageInfo",
1604  pUrl.GetHostId().c_str(),
1605  pRequest->GetObfuscatedDescription().c_str() );
1606 
1607  //----------------------------------------------------------------------
1608  // Glue in the cached responses if necessary
1609  //----------------------------------------------------------------------
1610  ChunkInfo chunk = pChunkList->front();
1611  bool sizeMismatch = false;
1612  uint32_t currentOffset = 0;
1613  char *cursor = (char*)chunk.buffer;
1614  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1615  {
1616  ServerResponseV2 *part = (ServerResponseV2*)pPartialResps[i]->GetBuffer();
1617 
1618  //--------------------------------------------------------------------
1619  // the actual size of the raw data without the crc32c checksums
1620  //--------------------------------------------------------------------
1621  size_t datalen = part->status.bdy.dlen - NbPgPerRsp( part->info.pgread.offset,
1622  part->status.bdy.dlen ) * CksumSize;
1623 
1624  if( currentOffset + datalen > chunk.length )
1625  {
1626  sizeMismatch = true;
1627  break;
1628  }
1629 
1630  currentOffset += datalen;
1631  cursor += datalen;
1632  }
1633 
1634  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
1635  size_t datalen = rspst->status.bdy.dlen - NbPgPerRsp( rspst->info.pgread.offset,
1636  rspst->status.bdy.dlen ) * CksumSize;
1637  if( currentOffset + datalen <= chunk.length )
1638  currentOffset += datalen;
1639  else
1640  sizeMismatch = true;
1641 
1642  //----------------------------------------------------------------------
1643  // Overflow
1644  //----------------------------------------------------------------------
1645  if( pChunkStatus.front().sizeError || sizeMismatch )
1646  {
1647  log->Error( XRootDMsg, "[%s] Handling response to %s: user supplied "
1648  "buffer is too small for the received data.",
1649  pUrl.GetHostId().c_str(),
1650  pRequest->GetObfuscatedDescription().c_str() );
1651  return Status( stError, errInvalidResponse );
1652  }
1653 
1654  AnyObject *obj = new AnyObject();
1655  PageInfo *pgInfo = new PageInfo( chunk.offset, currentOffset, chunk.buffer,
1656  std::move( pCrc32cDigests) );
1657 
1658  obj->Set( pgInfo );
1659  response = obj;
1660  return Status();
1661  }
1662 
1663  //------------------------------------------------------------------------
1664  // kXR_pgwrite
1665  //------------------------------------------------------------------------
1666  case kXR_pgwrite:
1667  {
1668  std::vector<std::tuple<uint64_t, uint32_t>> retries;
1669 
1670  ServerResponseV2 *rsp = (ServerResponseV2*)pResponse->GetBuffer();
1671  if( rsp->status.bdy.dlen > 0 )
1672  {
1673  ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) );
1674  size_t pgcnt = ( rsp->status.bdy.dlen - 8 ) / sizeof( kXR_int64 );
1675  retries.reserve( pgcnt );
1676  kXR_int64 *pgoffs = (kXR_int64*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) +
1677  sizeof( ServerResponseBody_pgWrCSE ) );
1678 
1679  for( size_t i = 0; i < pgcnt; ++i )
1680  {
1681  uint32_t len = XrdSys::PageSize;
1682  if( i == 0 ) len = cse->dlFirst;
1683  else if( i == pgcnt - 1 ) len = cse->dlLast;
1684  retries.push_back( std::make_tuple( pgoffs[i], len ) );
1685  }
1686  }
1687 
1688  RetryInfo *info = new RetryInfo( std::move( retries ) );
1689  AnyObject *obj = new AnyObject();
1690  obj->Set( info );
1691  response = obj;
1692 
1693  return Status();
1694  }
1695 
1696 
1697  //------------------------------------------------------------------------
1698  // kXR_readv - we need to pass the length of the buffer to the user code
1699  //------------------------------------------------------------------------
1700  case kXR_readv:
1701  {
1702  log->Dump( XRootDMsg, "[%s] Parsing the response to %p as "
1703  "VectorReadInfo", pUrl.GetHostId().c_str(),
1704  pRequest->GetObfuscatedDescription().c_str() );
1705 
1706  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1707  {
1708  //--------------------------------------------------------------------
1709  // we are expecting to have only the header in the message, the raw
1710  // data have been readout into the user buffer
1711  //--------------------------------------------------------------------
1712  if( pPartialResps[i]->GetSize() > 8 )
1713  return Status( stOK, errInternal );
1714  }
1715  //----------------------------------------------------------------------
1716  // we are expecting to have only the header in the message, the raw
1717  // data have been readout into the user buffer
1718  //----------------------------------------------------------------------
1719  if( pResponse->GetSize() > 8 )
1720  return Status( stOK, errInternal );
1721  //----------------------------------------------------------------------
1722  // Get the response for the end user
1723  //----------------------------------------------------------------------
1724  return pBodyReader->GetResponse( response );
1725  }
1726 
1727  //------------------------------------------------------------------------
1728  // kXR_fattr
1729  //------------------------------------------------------------------------
1730  case kXR_fattr:
1731  {
1732  int len = rsp->hdr.dlen;
1733  char* data = rsp->body.buffer.data;
1734 
1735  return ParseXAttrResponse( data, len, response );
1736  }
1737 
1738  //------------------------------------------------------------------------
1739  // kXR_query
1740  //------------------------------------------------------------------------
1741  case kXR_query:
1742  case kXR_set:
1743  case kXR_prepare:
1744  default:
1745  {
1746  AnyObject *obj = new AnyObject();
1747  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as BinaryData",
1748  pUrl.GetHostId().c_str(),
1749  pRequest->GetObfuscatedDescription().c_str() );
1750 
1751  BinaryDataInfo *data = new BinaryDataInfo();
1752  data->Allocate( length );
1753  data->Append( buffer, length );
1754  obj->Set( data );
1755  response = obj;
1756  return Status();
1757  }
1758  };
1759  return Status( stError, errInvalidMessage );
1760  }
1761 
1762  //------------------------------------------------------------------------
1763  // Parse the response to kXR_fattr request and put it in an object that
1764  // could be passed to the user
1765  //------------------------------------------------------------------------
1766  Status XRootDMsgHandler::ParseXAttrResponse( char *data, size_t len,
1767  AnyObject *&response )
1768  {
1769  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1770 // Log *log = DefaultEnv::GetLog(); //TODO
1771 
1772  switch( req->fattr.subcode )
1773  {
1774  case kXR_fattrDel:
1775  case kXR_fattrSet:
1776  {
1777  Status status;
1778 
1779  kXR_char nerrs = 0;
1780  if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1781  return status;
1782 
1783  kXR_char nattr = 0;
1784  if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1785  return status;
1786 
1787  std::vector<XAttrStatus> resp;
1788  // read the namevec
1789  for( kXR_char i = 0; i < nattr; ++i )
1790  {
1791  kXR_unt16 rc = 0;
1792  if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1793  return status;
1794  rc = ntohs( rc );
1795 
1796  // count errors
1797  if( rc ) --nerrs;
1798 
1799  std::string name;
1800  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1801  return status;
1802 
1803  XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1804  XRootDStatus();
1805  resp.push_back( XAttrStatus( name, st ) );
1806  }
1807 
1808  // check if we read all the data and if the error count is OK
1809  if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1810 
1811  // set up the response object
1812  response = new AnyObject();
1813  response->Set( new std::vector<XAttrStatus>( std::move( resp ) ) );
1814 
1815  return Status();
1816  }
1817 
1818  case kXR_fattrGet:
1819  {
1820  Status status;
1821 
1822  kXR_char nerrs = 0;
1823  if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1824  return status;
1825 
1826  kXR_char nattr = 0;
1827  if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1828  return status;
1829 
1830  std::vector<XAttr> resp;
1831  resp.reserve( nattr );
1832 
1833  // read the name vec
1834  for( kXR_char i = 0; i < nattr; ++i )
1835  {
1836  kXR_unt16 rc = 0;
1837  if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1838  return status;
1839  rc = ntohs( rc );
1840 
1841  // count errors
1842  if( rc ) --nerrs;
1843 
1844  std::string name;
1845  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1846  return status;
1847 
1848  XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1849  XRootDStatus();
1850  resp.push_back( XAttr( name, st ) );
1851  }
1852 
1853  // read the value vec
1854  for( kXR_char i = 0; i < nattr; ++i )
1855  {
1856  kXR_int32 vlen = 0;
1857  if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1858  return status;
1859  vlen = ntohl( vlen );
1860 
1861  std::string value;
1862  if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1863  return status;
1864 
1865  resp[i].value.swap( value );
1866  }
1867 
1868  // check if we read all the data and if the error count is OK
1869  if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1870 
1871  // set up the response object
1872  response = new AnyObject();
1873  response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1874 
1875  return Status();
1876  }
1877 
1878  case kXR_fattrList:
1879  {
1880  Status status;
1881  std::vector<XAttr> resp;
1882 
1883  while( len > 0 )
1884  {
1885  std::string name;
1886  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1887  return status;
1888 
1889  kXR_int32 vlen = 0;
1890  if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1891  return status;
1892  vlen = ntohl( vlen );
1893 
1894  std::string value;
1895  if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1896  return status;
1897 
1898  resp.push_back( XAttr( name, value ) );
1899  }
1900 
1901  // set up the response object
1902  response = new AnyObject();
1903  response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1904 
1905  return Status();
1906  }
1907 
1908  default:
1909  return Status( stError, errDataError );
1910  }
1911  }
1912 
1913  //----------------------------------------------------------------------------
1914  // Perform the changes to the original request needed by the redirect
1915  // procedure - allocate new streamid, append redirection data and such
1916  //----------------------------------------------------------------------------
1917  Status XRootDMsgHandler::RewriteRequestRedirect( const URL &newUrl )
1918  {
1919  Log *log = DefaultEnv::GetLog();
1920 
1921  Status st;
1922  // Append any "xrd.*" parameters present in newCgi so that any authentication
1923  // requirements are properly enforced
1924  const URL::ParamsMap &newCgi = newUrl.GetParams();
1925  std::string xrdCgi = "";
1926  std::ostringstream ossXrd;
1927  for(URL::ParamsMap::const_iterator it = newCgi.begin(); it != newCgi.end(); ++it )
1928  {
1929  if( it->first.compare( 0, 4, "xrd." ) )
1930  continue;
1931  ossXrd << it->first << '=' << it->second << '&';
1932  }
1933 
1934  xrdCgi = ossXrd.str();
1935  // Redirection URL containing also any original xrd.* opaque parameters
1936  XrdCl::URL authUrl;
1937 
1938  if (xrdCgi.empty())
1939  {
1940  authUrl = newUrl;
1941  }
1942  else
1943  {
1944  std::string surl = newUrl.GetURL();
1945  (surl.find('?') == std::string::npos) ? (surl += '?') :
1946  ((*surl.rbegin() != '&') ? (surl += '&') : (surl += ""));
1947  surl += xrdCgi;
1948  if (!authUrl.FromString(surl))
1949  {
1950  std::string surlLog = surl;
1951  if( unlikely( log->GetLevel() >= Log::ErrorMsg ) ) {
1952  surlLog = obfuscateAuth(surlLog);
1953  }
1954  log->Error( XRootDMsg, "[%s] Failed to build redirection URL from data: %s",
1955  newUrl.GetHostId().c_str(), surl.c_str());
1956  return Status(stError, errInvalidRedirectURL);
1957  }
1958  }
1959 
1960  //--------------------------------------------------------------------------
1961  // Rewrite particular requests
1962  //--------------------------------------------------------------------------
1964  MessageUtils::RewriteCGIAndPath( pRequest, newCgi, true, newUrl.GetPath() );
1966  return Status();
1967  }
1968 
1969  //----------------------------------------------------------------------------
1970  // Some requests need to be rewritten also after getting kXR_wait
1971  //----------------------------------------------------------------------------
1972  Status XRootDMsgHandler::RewriteRequestWait()
1973  {
1974  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1975 
1977 
1978  //------------------------------------------------------------------------
1979  // For kXR_locate and kXR_open request the kXR_refresh bit needs to be
1980  // turned off after wait
1981  //------------------------------------------------------------------------
1982  switch( req->header.requestid )
1983  {
1984  case kXR_locate:
1985  {
1986  uint16_t refresh = kXR_refresh;
1987  req->locate.options &= (~refresh);
1988  break;
1989  }
1990 
1991  case kXR_open:
1992  {
1993  uint16_t refresh = kXR_refresh;
1994  req->locate.options &= (~refresh);
1995  break;
1996  }
1997  }
1998 
1999  XRootDTransport::SetDescription( pRequest );
2001  return Status();
2002  }
2003 
2004  //----------------------------------------------------------------------------
2005  // Recover error
2006  //----------------------------------------------------------------------------
2007  void XRootDMsgHandler::HandleError( XRootDStatus status )
2008  {
2009  //--------------------------------------------------------------------------
2010  // If there was no error then do nothing
2011  //--------------------------------------------------------------------------
2012  if( status.IsOK() )
2013  return;
2014 
2015  if( pSidMgr && pMsgInFly && (
2016  status.code == errOperationExpired ||
2017  status.code == errOperationInterrupted ) )
2018  {
2019  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2020  pSidMgr->TimeOutSID( req->header.streamid );
2021  }
2022 
2023  bool noreplicas = ( status.code == errErrorResponse &&
2024  status.errNo == kXR_noReplicas );
2025 
2026  if( !noreplicas ) pLastError = status;
2027 
2028  Log *log = DefaultEnv::GetLog();
2029  log->Debug( XRootDMsg, "[%s] Handling error while processing %s: %s.",
2030  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str(),
2031  status.ToString().c_str() );
2032 
2033  //--------------------------------------------------------------------------
2034  // Check if it is a fatal TLS error that has been marked as potentially
2035  // recoverable, if yes check if we can downgrade from fatal to error.
2036  //--------------------------------------------------------------------------
2037  if( status.IsFatal() && status.code == errTlsError && status.errNo == EAGAIN )
2038  {
2039  if( pSslErrCnt < MaxSslErrRetry )
2040  {
2041  status.status &= ~stFatal; // switch off fatal&error bits
2042  status.status |= stError; // switch on error bit
2043  }
2044  ++pSslErrCnt; // count number of consecutive SSL errors
2045  }
2046  else
2047  pSslErrCnt = 0;
2048 
2049  //--------------------------------------------------------------------------
2050  // We have got an error message, we can recover it at the load balancer if:
2051  // 1) we haven't got it from the load balancer
2052  // 2) we have a load balancer assigned
2053  // 3) the error is either one of: kXR_FSError, kXR_IOError, kXR_ServerError,
2054  // kXR_NotFound
2055  // 4) in the case of kXR_NotFound a kXR_refresh flags needs to be set
2056  //--------------------------------------------------------------------------
2057  if( status.code == errErrorResponse )
2058  {
2059  if( RetriableErrorResponse( status ) )
2060  {
2061  UpdateTriedCGI(status.errNo);
2062  if( status.errNo == kXR_NotFound || status.errNo == kXR_Overloaded )
2063  SwitchOnRefreshFlag();
2064  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2065  return;
2066  }
2067  else
2068  {
2069  pStatus = status;
2070  HandleRspOrQueue();
2071  return;
2072  }
2073  }
2074 
2075  //--------------------------------------------------------------------------
2076  // Nothing can be done if:
2077  // 1) a user timeout has occurred
2078  // 2) has a non-zero session id
2079  // 3) if another error occurred and the validity of the message expired
2080  //--------------------------------------------------------------------------
2081  if( status.code == errOperationExpired || pRequest->GetSessionId() ||
2082  status.code == errOperationInterrupted || time(0) >= pExpiration )
2083  {
2084  log->Error( XRootDMsg, "[%s] Unable to get the response to request %s",
2085  pUrl.GetHostId().c_str(),
2086  pRequest->GetObfuscatedDescription().c_str() );
2087  pStatus = status;
2088  HandleRspOrQueue();
2089  return;
2090  }
2091 
2092  //--------------------------------------------------------------------------
2093  // At this point we're left with connection errors, we recover them
2094  // at a load balancer if we have one and if not on the current server
2095  // until we get a response, an unrecoverable error or a timeout
2096  //--------------------------------------------------------------------------
2097  if( pLoadBalancer.url.IsValid() &&
2098  pLoadBalancer.url.GetLocation() != pUrl.GetLocation() )
2099  {
2100  UpdateTriedCGI( kXR_ServerError );
2101  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2102  return;
2103  }
2104  else
2105  {
2106  if( !status.IsFatal() && IsRetriable() )
2107  {
2108  log->Info( XRootDMsg, "[%s] Retrying request: %s.",
2109  pUrl.GetHostId().c_str(),
2110  pRequest->GetObfuscatedDescription().c_str() );
2111 
2112  UpdateTriedCGI( kXR_ServerError );
2113  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryRetry ) );
2114  return;
2115  }
2116  pStatus = status;
2117  HandleRspOrQueue();
2118  return;
2119  }
2120  }
2121 
2122  //----------------------------------------------------------------------------
2123  // Retry the message at another server
2124  //----------------------------------------------------------------------------
2125  Status XRootDMsgHandler::RetryAtServer( const URL &url, RedirectEntry::Type entryType )
2126  {
2127  pResponse.reset();
2128  Log *log = DefaultEnv::GetLog();
2129 
2130  //--------------------------------------------------------------------------
2131  // Set up a redirect entry
2132  //--------------------------------------------------------------------------
2133  if( pRdirEntry ) pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
2134  pRdirEntry.reset( new RedirectEntry( pUrl.GetLocation(), url.GetLocation(), entryType ) );
2135 
2136  if( pUrl.GetLocation() != url.GetLocation() )
2137  {
2138  pHosts->push_back( url );
2139 
2140  //------------------------------------------------------------------------
2141  // Assign a new stream id to the message
2142  //------------------------------------------------------------------------
2143 
2144  // first release the old stream id
2145  // (though it could be a redirect from a local
2146  // metalink file, in this case there's no SID)
2147  ClientRequestHdr *req = (ClientRequestHdr*)pRequest->GetBuffer();
2148  if( pSidMgr )
2149  {
2150  pSidMgr->ReleaseSID( req->streamid );
2151  pSidMgr.reset();
2152  }
2153 
2154  // then get the new SIDManager
2155  // (again this could be a redirect to a local
2156  // file and in this case there is no SID)
2157  if( !url.IsLocalFile() )
2158  {
2159  pSidMgr = SIDMgrPool::Instance().GetSIDMgr( url );
2160  Status st = pSidMgr->AllocateSID( req->streamid );
2161  if( !st.IsOK() )
2162  {
2163  log->Error( XRootDMsg, "[%s] Impossible to send message %s.",
2164  pUrl.GetHostId().c_str(),
2165  pRequest->GetObfuscatedDescription().c_str() );
2166  return st;
2167  }
2168  }
2169 
2170  pUrl = url;
2171  }
2172 
2173  if( pUrl.IsMetalink() && pFollowMetalink )
2174  {
2175  log->Debug( ExDbgMsg, "[%s] Metaling redirection for MsgHandler: %p (message: %s ).",
2176  pUrl.GetHostId().c_str(), this,
2177  pRequest->GetObfuscatedDescription().c_str() );
2178 
2179  return pPostMaster->Redirect( pUrl, pRequest, this );
2180  }
2181  else if( pUrl.IsLocalFile() )
2182  {
2183  HandleLocalRedirect( &pUrl );
2184  return Status();
2185  }
2186  else
2187  {
2188  log->Debug( ExDbgMsg, "[%s] Retry at server MsgHandler: %p (message: %s ).",
2189  pUrl.GetHostId().c_str(), this,
2190  pRequest->GetObfuscatedDescription().c_str() );
2191  return pPostMaster->Send( pUrl, pRequest, this, true, pExpiration );
2192  }
2193  }
2194 
2195  //----------------------------------------------------------------------------
2196  // Update the "tried=" part of the CGI of the current message
2197  //----------------------------------------------------------------------------
2198  void XRootDMsgHandler::UpdateTriedCGI(uint32_t errNo)
2199  {
2200  URL::ParamsMap cgi;
2201  std::string tried;
2202 
2203  //--------------------------------------------------------------------------
2204  // In case a data server responded with a kXR_redirect and we fail at the
2205  // node where we were redirected to, the original data server should be
2206  // included in the tried CGI opaque info (instead of the current one).
2207  //--------------------------------------------------------------------------
2208  if( pEffectiveDataServerUrl )
2209  {
2210  tried = pEffectiveDataServerUrl->GetHostName();
2211  delete pEffectiveDataServerUrl;
2212  pEffectiveDataServerUrl = 0;
2213  }
2214  //--------------------------------------------------------------------------
2215  // Otherwise use the current URL.
2216  //--------------------------------------------------------------------------
2217  else
2218  tried = pUrl.GetHostName();
2219 
2220  // Report the reason for the failure to the next location
2221  //
2222  if (errNo)
2223  { if (errNo == kXR_NotFound) cgi["triedrc"] = "enoent";
2224  else if (errNo == kXR_IOError) cgi["triedrc"] = "ioerr";
2225  else if (errNo == kXR_FSError) cgi["triedrc"] = "fserr";
2226  else if (errNo == kXR_ServerError) cgi["triedrc"] = "srverr";
2227  }
2228 
2229  //--------------------------------------------------------------------------
2230  // If our current load balancer is a metamanager and we failed either
2231  // at a diskserver or at an unidentified node we also exclude the last
2232  // known manager
2233  //--------------------------------------------------------------------------
2234  if( pLoadBalancer.url.IsValid() && (pLoadBalancer.flags & kXR_attrMeta) )
2235  {
2236  HostList::reverse_iterator it;
2237  for( it = pHosts->rbegin()+1; it != pHosts->rend(); ++it )
2238  {
2239  if( it->loadBalancer )
2240  break;
2241 
2242  tried += "," + it->url.GetHostName();
2243 
2244  if( it->flags & kXR_isManager )
2245  break;
2246  }
2247  }
2248 
2249  cgi["tried"] = tried;
2251  MessageUtils::RewriteCGIAndPath( pRequest, cgi, false, "" );
2253  }
2254 
2255  //----------------------------------------------------------------------------
2256  // Switch on the refresh flag for some requests
2257  //----------------------------------------------------------------------------
2258  void XRootDMsgHandler::SwitchOnRefreshFlag()
2259  {
2261  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2262  switch( req->header.requestid )
2263  {
2264  case kXR_locate:
2265  {
2266  req->locate.options |= kXR_refresh;
2267  break;
2268  }
2269 
2270  case kXR_open:
2271  {
2272  req->locate.options |= kXR_refresh;
2273  break;
2274  }
2275  }
2276  XRootDTransport::SetDescription( pRequest );
2278  }
2279 
2280  //------------------------------------------------------------------------
2281  // If the current thread is a worker thread from our thread-pool
2282  // handle the response, otherwise submit a new task to the thread-pool
2283  //------------------------------------------------------------------------
2284  void XRootDMsgHandler::HandleRspOrQueue()
2285  {
2286  //--------------------------------------------------------------------------
2287  // Is it a final response?
2288  //--------------------------------------------------------------------------
2289  bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
2290  if( finalrsp )
2291  {
2292  // Do not do final processing of the response if we haven't had
2293  // confirmation the original request was sent (via OnStatusReady).
2294  // The final processing will be triggered when we get the confirm.
2295  const int sst = pSendingState.fetch_or( kFinalResp );
2296  if( !( sst & kSendDone ) )
2297  return;
2298  }
2299 
2300  JobManager *jobMgr = pPostMaster->GetJobManager();
2301  if( jobMgr->IsWorker() )
2302  HandleResponse();
2303  else
2304  {
2305  Log *log = DefaultEnv::GetLog();
2306  log->Debug( ExDbgMsg, "[%s] Passing to the thread-pool MsgHandler: %p (message: %s ).",
2307  pUrl.GetHostId().c_str(), this,
2308  pRequest->GetObfuscatedDescription().c_str() );
2309  jobMgr->QueueJob( new HandleRspJob( this ), 0 );
2310  }
2311  }
2312 
2313  //------------------------------------------------------------------------
2314  // Notify the FileStateHandler to retry Open() with new URL
2315  //------------------------------------------------------------------------
2316  void XRootDMsgHandler::HandleLocalRedirect( URL *url )
2317  {
2318  Log *log = DefaultEnv::GetLog();
2319  log->Debug( ExDbgMsg, "[%s] Handling local redirect - MsgHandler: %p (message: %s ).",
2320  pUrl.GetHostId().c_str(), this,
2321  pRequest->GetObfuscatedDescription().c_str() );
2322 
2323  if( !pLFileHandler )
2324  {
2325  HandleError( XRootDStatus( stFatal, errNotSupported ) );
2326  return;
2327  }
2328 
2329  AnyObject *resp = 0;
2330  pLFileHandler->SetHostList( *pHosts );
2331  XRootDStatus st = pLFileHandler->Open( url, pRequest, resp );
2332  if( !st.IsOK() )
2333  {
2334  HandleError( st );
2335  return;
2336  }
2337 
2338  pResponseHandler->HandleResponseWithHosts( new XRootDStatus(),
2339  resp,
2340  pHosts.release() );
2341  delete this;
2342 
2343  return;
2344  }
2345 
2346  //------------------------------------------------------------------------
2347  // Check if it is OK to retry this request
2348  //------------------------------------------------------------------------
2349  bool XRootDMsgHandler::IsRetriable()
2350  {
2351  std::string value;
2352  DefaultEnv::GetEnv()->GetString( "OpenRecovery", value );
2353  if( value == "true" ) return true;
2354 
2355  // check if it is a mutable open (open + truncate or open + create)
2356  ClientRequest *req = reinterpret_cast<ClientRequest*>( pRequest->GetBuffer() );
2357  if( req->header.requestid == htons( kXR_open ) )
2358  {
2359  bool _mutable = ( req->open.options & htons( kXR_delete ) ) ||
2360  ( req->open.options & htons( kXR_new ) );
2361 
2362  if( _mutable )
2363  {
2364  Log *log = DefaultEnv::GetLog();
2365  log->Debug( XRootDMsg,
2366  "[%s] Not allowed to retry open request (OpenRecovery disabled): %s.",
2367  pUrl.GetHostId().c_str(),
2368  pRequest->GetObfuscatedDescription().c_str() );
2369  // disallow retry if it is a mutable open
2370  return false;
2371  }
2372  }
2373 
2374  return true;
2375  }
2376 
2377  //------------------------------------------------------------------------
2378  // Check if for given request and Metalink redirector it is OK to omit
2379  // the kXR_wait and proceed straight to the next entry in the Metalink file
2380  //------------------------------------------------------------------------
2381  bool XRootDMsgHandler::OmitWait( Message &request, const URL &url )
2382  {
2383  // we can omit kXR_wait only if we have a Metalink redirector
2384  if( !url.IsMetalink() )
2385  return false;
2386 
2387  // we can omit kXR_wait only for requests that can be redirected
2388  // (kXR_read is the only stateful request that can be redirected)
2389  ClientRequest *req = reinterpret_cast<ClientRequest*>( request.GetBuffer() );
2390  if( pStateful && req->header.requestid != kXR_read )
2391  return false;
2392 
2393  // we can only omit kXR_wait if the Metalink redirect has more
2394  // replicas
2395  RedirectorRegistry &registry = RedirectorRegistry::Instance();
2396  VirtualRedirector *redirector = registry.Get( url );
2397 
2398  // we need more than one server as the current one is not reflected
2399  // in tried CGI
2400  if( redirector->Count( request ) > 1 )
2401  return true;
2402 
2403  return false;
2404  }
2405 
2406  //------------------------------------------------------------------------
2407  // Checks if the given error returned by server is retriable.
2408  //------------------------------------------------------------------------
2409  bool XRootDMsgHandler::RetriableErrorResponse( const Status &status )
2410  {
2411  // we can only retry error response if we have a valid load-balancer and
2412  // it is not our current URL
2413  if( !( pLoadBalancer.url.IsValid() &&
2414  pUrl.GetLocation() != pLoadBalancer.url.GetLocation() ) )
2415  return false;
2416 
2417  // following errors are retriable at any load-balancer
2418  if( status.errNo == kXR_FSError || status.errNo == kXR_IOError ||
2419  status.errNo == kXR_ServerError || status.errNo == kXR_NotFound ||
2420  status.errNo == kXR_Overloaded || status.errNo == kXR_NoMemory )
2421  return true;
2422 
2423  // check if the load-balancer is a meta-manager, if yes there are
2424  // more errors that can be recovered
2425  if( !( pLoadBalancer.flags & kXR_attrMeta ) ) return false;
2426 
2427  // those errors are retriable for meta-managers
2428  if( status.errNo == kXR_Unsupported || status.errNo == kXR_FileLocked )
2429  return true;
2430 
2431  // in case of not-authorized error there is an imposed upper limit
2432  // on how many times we can retry this error
2433  if( status.errNo == kXR_NotAuthorized )
2434  {
2435  int limit = DefaultNotAuthorizedRetryLimit;
2436  DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", limit );
2437  bool ret = pNotAuthorizedCounter < limit;
2438  ++pNotAuthorizedCounter;
2439  if( !ret )
2440  {
2441  Log *log = DefaultEnv::GetLog();
2442  log->Error( XRootDMsg,
2443  "[%s] Reached limit of NotAuthorized retries!",
2444  pUrl.GetHostId().c_str() );
2445  }
2446  return ret;
2447  }
2448 
2449  // check if the load-balancer is a virtual (metalink) redirector,
2450  // if yes there are even more errors that can be recovered
2451  if( !( pLoadBalancer.flags & kXR_attrVirtRdr ) ) return false;
2452 
2453  // those errors are retriable for virtual (metalink) redirectors
2454  if( status.errNo == kXR_noserver || status.errNo == kXR_ArgTooLong )
2455  return true;
2456 
2457  // otherwise it is a non-retriable error
2458  return false;
2459  }
2460 
2461  //------------------------------------------------------------------------
2462  // Dump the redirect-trace-back into the log file
2463  //------------------------------------------------------------------------
2464  void XRootDMsgHandler::DumpRedirectTraceBack()
2465  {
2466  if( pRedirectTraceBack.empty() ) return;
2467 
2468  std::stringstream sstrm;
2469 
2470  sstrm << "Redirect trace-back:\n";
2471 
2472  int counter = 0;
2473 
2474  auto itr = pRedirectTraceBack.begin();
2475  sstrm << '\t' << counter << ". " << (*itr)->ToString() << '\n';
2476 
2477  auto prev = itr;
2478  ++itr;
2479  ++counter;
2480 
2481  for( ; itr != pRedirectTraceBack.end(); ++itr, ++prev, ++counter )
2482  sstrm << '\t' << counter << ". "
2483  << (*itr)->ToString( (*prev)->status.IsOK() ) << '\n';
2484 
2485  int authlimit = DefaultNotAuthorizedRetryLimit;
2486  DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", authlimit );
2487 
2488  bool warn = !pStatus.IsOK() &&
2489  ( pStatus.code == errNotFound ||
2490  pStatus.code == errRedirectLimit ||
2491  ( pStatus.code == errAuthFailed && pNotAuthorizedCounter >= authlimit ) );
2492 
2493  Log *log = DefaultEnv::GetLog();
2494  if( warn )
2495  log->Warning( XRootDMsg, "%s", sstrm.str().c_str() );
2496  else
2497  log->Debug( XRootDMsg, "%s", sstrm.str().c_str() );
2498  }
2499 
2500  // Read data from buffer
2501  //------------------------------------------------------------------------
2502  template<typename T>
2503  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, T& result )
2504  {
2505  if( sizeof( T ) > buflen ) return Status( stError, errDataError );
2506 
2507  memcpy(&result, buffer, sizeof(T));
2508 
2509  buffer += sizeof( T );
2510  buflen -= sizeof( T );
2511 
2512  return Status();
2513  }
2514 
2515  //------------------------------------------------------------------------
2516  // Read a string from buffer
2517  //------------------------------------------------------------------------
2518  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result )
2519  {
2520  Status status;
2521  char c = 0;
2522 
2523  while( true )
2524  {
2525  if( !( status = ReadFromBuffer( buffer, buflen, c ) ).IsOK() )
2526  return status;
2527 
2528  if( c == 0 ) break;
2529  result += c;
2530  }
2531 
2532  return status;
2533  }
2534 
2535  //------------------------------------------------------------------------
2536  // Read a string from buffer
2537  //------------------------------------------------------------------------
2538  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen,
2539  size_t size, std::string &result )
2540  {
2541  Status status;
2542 
2543  if( size > buflen ) return Status( stError, errDataError );
2544 
2545  result.append( buffer, size );
2546  buffer += size;
2547  buflen -= size;
2548 
2549  return status;
2550  }
2551 
2552 }
@ kXR_NotAuthorized
Definition: XProtocol.hh:1000
@ kXR_NotFound
Definition: XProtocol.hh:1001
@ kXR_FileLocked
Definition: XProtocol.hh:993
@ kXR_noReplicas
Definition: XProtocol.hh:1019
@ kXR_Unsupported
Definition: XProtocol.hh:1003
@ kXR_ServerError
Definition: XProtocol.hh:1002
@ kXR_Overloaded
Definition: XProtocol.hh:1014
@ kXR_ArgTooLong
Definition: XProtocol.hh:992
@ kXR_noserver
Definition: XProtocol.hh:1004
@ kXR_IOError
Definition: XProtocol.hh:997
@ kXR_FSError
Definition: XProtocol.hh:995
@ kXR_NoMemory
Definition: XProtocol.hh:998
#define kXR_isManager
Definition: XProtocol.hh:1156
union ServerResponse::@0 body
@ kXR_fattrDel
Definition: XProtocol.hh:270
@ kXR_fattrSet
Definition: XProtocol.hh:273
@ kXR_fattrList
Definition: XProtocol.hh:272
@ kXR_fattrGet
Definition: XProtocol.hh:271
struct ClientFattrRequest fattr
Definition: XProtocol.hh:854
#define kXR_collapseRedir
Definition: XProtocol.hh:1167
ServerResponseStatus status
Definition: XProtocol.hh:1310
#define kXR_attrMeta
Definition: XProtocol.hh:1159
kXR_char streamid[2]
Definition: XProtocol.hh:156
kXR_char streamid[2]
Definition: XProtocol.hh:914
kXR_unt16 options
Definition: XProtocol.hh:481
struct ClientDirlistRequest dirlist
Definition: XProtocol.hh:852
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_refresh
Definition: XProtocol.hh:459
@ kXR_new
Definition: XProtocol.hh:455
@ kXR_retstat
Definition: XProtocol.hh:463
struct ClientOpenRequest open
Definition: XProtocol.hh:860
@ kXR_waitresp
Definition: XProtocol.hh:906
@ kXR_redirect
Definition: XProtocol.hh:904
@ kXR_oksofar
Definition: XProtocol.hh:900
@ kXR_status
Definition: XProtocol.hh:907
@ kXR_ok
Definition: XProtocol.hh:899
@ kXR_attn
Definition: XProtocol.hh:901
@ kXR_wait
Definition: XProtocol.hh:905
@ kXR_error
Definition: XProtocol.hh:903
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1262
struct ClientRequestHdr header
Definition: XProtocol.hh:846
#define kXR_recoverWrts
Definition: XProtocol.hh:1166
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_mkdir
Definition: XProtocol.hh:120
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_chmod
Definition: XProtocol.hh:114
@ kXR_dirlist
Definition: XProtocol.hh:116
@ kXR_fattr
Definition: XProtocol.hh:132
@ kXR_rm
Definition: XProtocol.hh:126
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_set
Definition: XProtocol.hh:130
@ kXR_rmdir
Definition: XProtocol.hh:127
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_protocol
Definition: XProtocol.hh:118
@ kXR_mv
Definition: XProtocol.hh:121
@ kXR_ping
Definition: XProtocol.hh:123
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_locate
Definition: XProtocol.hh:139
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_pgwrite
Definition: XProtocol.hh:138
@ kXR_prepare
Definition: XProtocol.hh:133
#define kXR_isServer
Definition: XProtocol.hh:1157
#define kXR_attrVirtRdr
Definition: XProtocol.hh:1162
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:849
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1261
union ServerResponseV2::@1 info
#define kXR_PROTOCOLVERSION
Definition: XProtocol.hh:70
@ kXR_vfs
Definition: XProtocol.hh:763
struct ClientStatRequest stat
Definition: XProtocol.hh:873
kXR_char options
Definition: XProtocol.hh:769
#define kXR_ecRedir
Definition: XProtocol.hh:1168
struct ClientLocateRequest locate
Definition: XProtocol.hh:856
ServerResponseHeader hdr
Definition: XProtocol.hh:1288
long long kXR_int64
Definition: XPtypes.hh:98
int kXR_int32
Definition: XPtypes.hh:89
unsigned short kXR_unt16
Definition: XPtypes.hh:67
unsigned char kXR_char
Definition: XPtypes.hh:65
#define unlikely(x)
std::string obfuscateAuth(const std::string &input)
void Get(Type &object)
Retrieve the object being held.
Object for reading out data from the PgRead response.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition: XrdClBuffer.hh:156
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
uint32_t GetCursor() const
Get append cursor.
Definition: XrdClBuffer.hh:140
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition: XrdClBuffer.hh:189
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static bool HasStatInfo(const char *data)
Returns true if data contain stat info.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
virtual void Run(void *arg)
The job logic.
HandleRspJob(XrdCl::XRootDMsgHandler *handler)
Interface for a job to be run by the job manager.
void SetHostList(const HostList &hostList)
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
Handle diagnostics.
Definition: XrdClLog.hh:101
@ ErrorMsg
report errors
Definition: XrdClLog.hh:109
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
StreamEvent
Events that may have occurred to the stream.
@ Ready
The stream has become connected.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
A network socket.
Definition: XrdClSocket.hh:43
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
Definition: XrdClSocket.cc:461
bool IsEncrypted()
Definition: XrdClSocket.cc:867
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:465
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
bool FromString(const std::string &url)
Parse a string and fill the URL fields.
Definition: XrdClURL.cc:62
void SetPassword(const std::string &password)
Set the password.
Definition: XrdClURL.hh:161
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:402
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition: XrdClURL.cc:344
const std::string & GetUserName() const
Get the username.
Definition: XrdClURL.hh:135
const std::string & GetPassword() const
Get the password.
Definition: XrdClURL.hh:153
bool IsLocalFile() const
Definition: XrdClURL.cc:474
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
void SetProtocol(const std::string &protocol)
Set protocol.
Definition: XrdClURL.hh:126
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
void SetUserName(const std::string &userName)
Set the username.
Definition: XrdClURL.hh:143
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Definition: XrdClUtils.cc:703
Handle/Process/Forward XRootD messages.
const Message * GetRequest() const
Get the request pointer.
virtual uint16_t InspectStatusRsp() override
virtual void OnStatusReady(const Message *message, XRootDStatus status) override
The requested action has been performed and the status is available.
virtual uint16_t Examine(std::shared_ptr< Message > &msg) override
virtual void Process() override
Process the message if it was "taken" by the examine action.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead) override
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status) override
virtual uint16_t GetSid() const override
virtual bool IsRaw() const override
Are we a raw writer or not?
const std::string & GetErrorMessage() const
Get error message.
static void SetDescription(Message *msg)
Get the description of a message.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errRedirectLimit
Definition: XrdClStatus.hh:102
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errNotFound
Definition: XrdClStatus.hh:100
const uint64_t XRootDMsg
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint64_t ExDbgMsg
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
const uint16_t errInvalidRedirectURL
Definition: XrdClStatus.hh:98
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
Buffer BinaryDataInfo
Binary buffer.
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const int DefaultNotAuthorizedRetryLimit
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errAuthFailed
Definition: XrdClStatus.hh:88
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85
none object for initializing empty Optional
XrdSysError Log
Definition: XrdConfig.cc:113
@ kXR_PartialResult
Definition: XProtocol.hh:1251
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
URL url
URL of the host.
uint32_t flags
Host type.
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version