XRootD
XrdCl::XRootDMsgHandler Class Reference

Handle/Process/Forward XRootD messages. More...

#include <XrdClXRootDMsgHandler.hh>

+ Inheritance diagram for XrdCl::XRootDMsgHandler:
+ Collaboration diagram for XrdCl::XRootDMsgHandler:

Public Member Functions

 XRootDMsgHandler (Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
 
 ~XRootDMsgHandler ()
 Destructor. More...
 
virtual uint16_t Examine (std::shared_ptr< Message > &msg)
 
time_t GetExpiration ()
 Get a timestamp after which we give up. More...
 
const MessageGetRequest () const
 Get the request pointer. More...
 
virtual uint16_t GetSid () const
 
virtual uint16_t InspectStatusRsp ()
 
virtual bool IsRaw () const
 Are we a raw writer or not? More...
 
virtual void OnStatusReady (const Message *message, XRootDStatus status)
 The requested action has been performed and the status is available. More...
 
virtual uint8_t OnStreamEvent (StreamEvent event, XRootDStatus status)
 
void PartialReceived ()
 
virtual void Process ()
 Process the message if it was "taken" by the examine action. More...
 
virtual XRootDStatus ReadMessageBody (Message *msg, Socket *socket, uint32_t &bytesRead)
 
void SetChunkList (ChunkList *chunkList)
 Set the chunk list. More...
 
void SetCrc32cDigests (std::vector< uint32_t > &&crc32cDigests)
 
void SetExpiration (time_t expiration)
 Set a timestamp after which we give up. More...
 
void SetFollowMetalink (bool followMetalink)
 
void SetHostList (HostList *hostList)
 Set host list. More...
 
void SetKernelBuffer (XrdSys::KernelBuffer *kbuff)
 Set the kernel buffer. More...
 
void SetLoadBalancer (const HostInfo &loadBalancer)
 Set the load balancer. More...
 
void SetOksofarAsAnswer (bool oksofarAsAnswer)
 
void SetRedirectAsAnswer (bool redirectAsAnswer)
 
void SetRedirectCounter (uint16_t redirectCounter)
 Set the redirect counter. More...
 
void SetStateful (bool stateful)
 
void WaitDone (time_t now)
 
XRootDStatus WriteMessageBody (Socket *socket, uint32_t &bytesWritten)
 
- Public Member Functions inherited from XrdCl::MsgHandler
virtual ~MsgHandler ()
 Event types that the message handler may receive. More...
 
virtual void OnReadyToSend (Message *msg)
 

Friends

class HandleRspJob
 

Additional Inherited Members

- Public Types inherited from XrdCl::MsgHandler
enum  Action {
  None = 0x0000 ,
  Nop = 0x0001 ,
  Ignore = 0x0002 ,
  RemoveHandler = 0x0004 ,
  Raw = 0x0008 ,
  NoProcess = 0x0010 ,
  Corrupted = 0x0020 ,
  More = 0x0040
}
 Actions to be taken after a message is processed by the handler. More...
 
enum  StreamEvent {
  Ready = 1 ,
  Broken = 2 ,
  Timeout = 3 ,
  FatalError = 4
}
 Events that may have occurred to the stream. More...
 

Detailed Description

Handle/Process/Forward XRootD messages.

Definition at line 118 of file XrdClXRootDMsgHandler.hh.

Constructor & Destructor Documentation

◆ XRootDMsgHandler()

XrdCl::XRootDMsgHandler::XRootDMsgHandler ( Message msg,
ResponseHandler respHandler,
const URL url,
std::shared_ptr< SIDManager sidMgr,
LocalFileHandler lFileHandler 
)
inline

Constructor

Parameters
msgmessage that has been sent out
respHandlerresponse handler to be called then the final final response arrives
urlthe url the message has been sent to
sidMgrthe sid manager used to allocate SID for the initial message

Definition at line 133 of file XrdClXRootDMsgHandler.hh.

137  :
138  pRequest( msg ),
139  pResponseHandler( respHandler ),
140  pUrl( *url ),
141  pEffectiveDataServerUrl( 0 ),
142  pSidMgr( sidMgr ),
143  pLFileHandler( lFileHandler ),
144  pExpiration( 0 ),
145  pRedirectAsAnswer( false ),
146  pOksofarAsAnswer( false ),
147  pHasLoadBalancer( false ),
148  pHasSessionId( false ),
149  pChunkList( 0 ),
150  pKBuff( 0 ),
151  pRedirectCounter( 0 ),
152  pNotAuthorizedCounter( 0 ),
153 
154  pAsyncOffset( 0 ),
155  pAsyncChunkIndex( 0 ),
156 
157  pPgWrtCksumBuff( 4 ),
158  pPgWrtCurrentPageOffset( 0 ),
159  pPgWrtCurrentPageNb( 0 ),
160 
161  pOtherRawStarted( false ),
162 
163  pFollowMetalink( false ),
164 
165  pStateful( false ),
166 
167  pAggregatedWaitTime( 0 ),
168 
169  pMsgInFly( false ),
170 
171  pTimeoutFence( false ),
172 
173  pDirListStarted( false ),
174  pDirListWithStat( false ),
175 
176  pCV( 0 ),
177 
178  pSslErrCnt( 0 )
179  {
180  pPostMaster = DefaultEnv::GetPostMaster();
181  if( msg->GetSessionId() )
182  pHasSessionId = true;
183 
184  Log *log = DefaultEnv::GetLog();
185  log->Debug( ExDbgMsg, "[%s] MsgHandler created: 0x%x (message: %s ).",
186  pUrl.GetHostId().c_str(), this,
187  pRequest->GetDescription().c_str() );
188 
189  ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
190  if( ntohs( hdr->requestid ) == kXR_pgread )
191  {
192  ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
193  pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
194  ntohl( pgrdreq->rlen ) ) );
195  }
196 
197  if( ntohs( hdr->requestid ) == kXR_readv )
198  pBodyReader.reset( new AsyncVectorReader( *url, *pRequest ) );
199  else if( ntohs( hdr->requestid ) == kXR_read )
200  pBodyReader.reset( new AsyncRawReader( *url, *pRequest ) );
201  else
202  pBodyReader.reset( new AsyncDiscardReader( *url, *pRequest ) );
203  }
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_pgread
Definition: XProtocol.hh:142
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
const std::string & GetDescription() const
Get the description of the message.
Definition: XrdClMessage.hh:95
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:94
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint64_t ExDbgMsg
XrdSysError Log
Definition: XrdConfig.cc:111

References XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::Message::GetDescription(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Message::GetSessionId(), kXR_pgread, kXR_read, kXR_readv, ClientPgReadRequest::offset, ClientRequestHdr::requestid, and ClientPgReadRequest::rlen.

+ Here is the call graph for this function:

◆ ~XRootDMsgHandler()

XrdCl::XRootDMsgHandler::~XRootDMsgHandler ( )
inline

Destructor.

Definition at line 208 of file XrdClXRootDMsgHandler.hh.

209  {
210  DumpRedirectTraceBack();
211 
212  if( !pHasSessionId )
213  delete pRequest;
214  delete pEffectiveDataServerUrl;
215 
216  pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
217  pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
218  pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
219  pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
220  pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
221  pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
222 
223  Log *log = DefaultEnv::GetLog();
224  log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: 0x%x.",
225  pUrl.GetHostId().c_str(), this );
226  }
std::vector< ChunkInfo > ChunkList
List of chunks.

References XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::URL::GetHostId(), and XrdCl::DefaultEnv::GetLog().

+ Here is the call graph for this function:

Member Function Documentation

◆ Examine()

uint16_t XrdCl::XRootDMsgHandler::Examine ( std::shared_ptr< Message > &  msg)
virtual

Examine an incoming message, and decide on the action to be taken

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 107 of file XrdClXRootDMsgHandler.cc.

108  {
109  //--------------------------------------------------------------------------
110  // if the MsgHandler is already being used to process another request
111  // (kXR_oksofar) we need to wait
112  //--------------------------------------------------------------------------
113  if( pOksofarAsAnswer )
114  {
115  XrdSysCondVarHelper lck( pCV );
116  while( pResponse ) pCV.Wait();
117  }
118  else
119  {
120  if( pResponse )
121  {
122  Log *log = DefaultEnv::GetLog();
123  log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
124  "it already owns a response: 0x%x (message: %s ).",
125  pUrl.GetHostId().c_str(), this,
126  pRequest->GetDescription().c_str() );
127  }
128  }
129 
130  if( msg->GetSize() < 8 )
131  return Ignore;
132 
133  ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
134  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
135  uint16_t status = 0;
136  uint32_t dlen = 0;
137 
138  //--------------------------------------------------------------------------
139  // We only care about async responses, but those are extracted now
140  // in the SocketHandler.
141  //--------------------------------------------------------------------------
142  if( rsp->hdr.status == kXR_attn )
143  {
144  return Ignore;
145  }
146  //--------------------------------------------------------------------------
147  // We got a sync message - check if it belongs to us
148  //--------------------------------------------------------------------------
149  else
150  {
151  if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
152  rsp->hdr.streamid[1] != req->header.streamid[1] )
153  return Ignore;
154 
155  status = rsp->hdr.status;
156  dlen = rsp->hdr.dlen;
157  }
158 
159  //--------------------------------------------------------------------------
160  // We take the ownership of the message and decide what we will do
161  // with the handler itself, the options are:
162  // 1) we want to either read in raw mode (the Raw flag) or have the message
163  // body reconstructed for us by the TransportHandler by the time
164  // Process() is called (default, no extra flag)
165  // 2) we either got a full response in which case we don't want to be
166  // notified about anything anymore (RemoveHandler) or we got a partial
167  // answer and we need to wait for more (default, no extra flag)
168  //--------------------------------------------------------------------------
169  pResponse = msg;
170  pBodyReader->SetDataLength( dlen );
171 
172  Log *log = DefaultEnv::GetLog();
173  switch( status )
174  {
175  //------------------------------------------------------------------------
176  // Handle the cached cases
177  //------------------------------------------------------------------------
178  case kXR_error:
179  case kXR_redirect:
180  case kXR_wait:
181  return RemoveHandler;
182 
183  case kXR_waitresp:
184  {
185  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
186  "message %s", pUrl.GetHostId().c_str(),
187  pRequest->GetDescription().c_str() );
188 
189  pResponse.reset();
190  return Ignore; // This must be handled synchronously!
191  }
192 
193  //------------------------------------------------------------------------
194  // Handle the potential raw cases
195  //------------------------------------------------------------------------
196  case kXR_ok:
197  {
198  //----------------------------------------------------------------------
199  // For kXR_read we read in raw mode
200  //----------------------------------------------------------------------
201  uint16_t reqId = ntohs( req->header.requestid );
202  if( reqId == kXR_read )
203  {
204  return Raw | RemoveHandler;
205  }
206 
207  //----------------------------------------------------------------------
208  // kXR_readv is the same as kXR_read
209  //----------------------------------------------------------------------
210  if( reqId == kXR_readv )
211  {
212  return Raw | RemoveHandler;
213  }
214 
215  //----------------------------------------------------------------------
216  // For everything else we just take what we got
217  //----------------------------------------------------------------------
218  return RemoveHandler;
219  }
220 
221  //------------------------------------------------------------------------
222  // kXR_oksofars are special, they are not full responses, so we reset
223  // the response pointer to 0 and add the message to the partial list
224  //------------------------------------------------------------------------
225  case kXR_oksofar:
226  {
227  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
228  "%s", pUrl.GetHostId().c_str(),
229  pRequest->GetDescription().c_str() );
230 
231  if( !pOksofarAsAnswer )
232  {
233  pPartialResps.emplace_back( std::move( pResponse ) );
234  }
235 
236  //----------------------------------------------------------------------
237  // For kXR_read we either read in raw mode if the message has not
238  // been fully reconstructed already, if it has, we adjust
239  // the buffer offset to prepare for the next one
240  //----------------------------------------------------------------------
241  uint16_t reqId = ntohs( req->header.requestid );
242  if( reqId == kXR_read )
243  {
244  pTimeoutFence.store( true, std::memory_order_relaxed );
245  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
246  }
247 
248  //----------------------------------------------------------------------
249  // kXR_readv is similar to read, except that the payload is different
250  //----------------------------------------------------------------------
251  if( reqId == kXR_readv )
252  {
253  pTimeoutFence.store( true, std::memory_order_relaxed );
254  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
255  }
256 
257  return ( pOksofarAsAnswer ? None : NoProcess );
258  }
259 
260  case kXR_status:
261  {
262  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
263  "%s", pUrl.GetHostId().c_str(),
264  pRequest->GetDescription().c_str() );
265 
266  uint16_t reqId = ntohs( req->header.requestid );
267  if( reqId == kXR_pgwrite )
268  {
269  //--------------------------------------------------------------------
270  // In case of pgwrite by definition this wont be a partial response
271  // so we can already remove the handler from the in-queue
272  //--------------------------------------------------------------------
273  return RemoveHandler;
274  }
275 
276  //----------------------------------------------------------------------
277  // Otherwise (pgread), first of all we need to read the body of the
278  // kXR_status response, we can handle the raw data (if any) only after
279  // we have the whole kXR_status body
280  //----------------------------------------------------------------------
281  pTimeoutFence.store( true, std::memory_order_relaxed );
282  return None;
283  }
284 
285  //------------------------------------------------------------------------
286  // Default
287  //------------------------------------------------------------------------
288  default:
289  return RemoveHandler;
290  }
291  return RemoveHandler;
292  }
kXR_char streamid[2]
Definition: XProtocol.hh:156
kXR_char streamid[2]
Definition: XProtocol.hh:912
@ kXR_waitresp
Definition: XProtocol.hh:904
@ kXR_redirect
Definition: XProtocol.hh:902
@ kXR_oksofar
Definition: XProtocol.hh:898
@ kXR_status
Definition: XProtocol.hh:905
@ kXR_ok
Definition: XProtocol.hh:897
@ kXR_attn
Definition: XProtocol.hh:899
@ kXR_wait
Definition: XProtocol.hh:903
@ kXR_error
Definition: XProtocol.hh:901
struct ClientRequestHdr header
Definition: XProtocol.hh:844
@ kXR_pgwrite
Definition: XProtocol.hh:138
ServerResponseHeader hdr
Definition: XProtocol.hh:1284
@ Ignore
Ignore the message.
const uint64_t XRootDMsg

References ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::Message::GetDescription(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, kXR_attn, kXR_error, kXR_ok, kXR_oksofar, kXR_pgwrite, kXR_read, kXR_readv, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::MsgHandler::None, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseHeader::status, ClientRequestHdr::streamid, ServerResponseHeader::streamid, XrdSysCondVar::Wait(), XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ GetExpiration()

time_t XrdCl::XRootDMsgHandler::GetExpiration ( )
inlinevirtual

Get a timestamp after which we give up.

Implements XrdCl::MsgHandler.

Definition at line 329 of file XrdClXRootDMsgHandler.hh.

330  {
331  return pExpiration;
332  }

◆ GetRequest()

const Message* XrdCl::XRootDMsgHandler::GetRequest ( ) const
inline

Get the request pointer.

Definition at line 355 of file XrdClXRootDMsgHandler.hh.

356  {
357  return pRequest;
358  }

◆ GetSid()

uint16_t XrdCl::XRootDMsgHandler::GetSid ( ) const
virtual

Get handler sid

return sid of the corresponding request, otherwise 0

Implements XrdCl::MsgHandler.

Definition at line 387 of file XrdClXRootDMsgHandler.cc.

388  {
389  ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
390  return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
391  }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::XRootDMsgHandler::InspectStatusRsp ( )
virtual

Reexamine the incoming message, and decide on the action to be taken

In case of kXR_status the message can be only fully examined after reading the whole body (without raw data).

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 297 of file XrdClXRootDMsgHandler.cc.

298  {
299  if( !pResponse )
300  return 0;
301 
302  Log *log = DefaultEnv::GetLog();
303  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
304 
305  //--------------------------------------------------------------------------
306  // Additional action is only required for kXR_status
307  //--------------------------------------------------------------------------
308  if( rsp->hdr.status != kXR_status ) return 0;
309 
310  //--------------------------------------------------------------------------
311  // Ignore malformed status response
312  //--------------------------------------------------------------------------
313  if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
314  {
315  log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
316  return Corrupted;
317  }
318 
319  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
320  uint16_t reqId = ntohs( req->header.requestid );
321  //--------------------------------------------------------------------------
322  // Unmarshal the status body
323  //--------------------------------------------------------------------------
324  XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
325 
326  if( !st.IsOK() && st.code == errDataError )
327  {
328  log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
329  st.GetErrorMessage().c_str() );
330  return Corrupted;
331  }
332 
333  if( !st.IsOK() )
334  {
335  log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
336  pUrl.GetHostId().c_str() );
337  pStatus = st;
338  HandleRspOrQueue();
339  return Ignore;
340  }
341 
342  //--------------------------------------------------------------------------
343  // Common handling for partial results
344  //--------------------------------------------------------------------------
345  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
347  {
348  pPartialResps.push_back( std::move( pResponse ) );
349  }
350 
351  //--------------------------------------------------------------------------
352  // Decide the actions that we need to take
353  //--------------------------------------------------------------------------
354  uint16_t action = 0;
355  if( reqId == kXR_pgread )
356  {
357  //----------------------------------------------------------------------
358  // The message contains only Status header and body but no raw data
359  //----------------------------------------------------------------------
360  if( !pPageReader )
361  pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
362  pPageReader->SetRsp( rspst );
363 
364  action |= Raw;
365 
367  action |= NoProcess;
368  else
369  action |= RemoveHandler;
370  }
371  else if( reqId == kXR_pgwrite )
372  {
373  // if data corruption has been detected on the server side we will
374  // send some additional data pointing to the pages that need to be
375  // retransmitted
376  if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
377  pResponse->GetCursor() )
378  action |= More;
379  }
380 
381  return action;
382  }
ServerResponseStatus status
Definition: XProtocol.hh:1306
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1258
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1257
@ More
there are more (non-raw) data to be read
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
@ kXR_PartialResult
Definition: XProtocol.hh:1247

References ServerResponseStatus::bdy, XrdCl::Status::code, XrdCl::MsgHandler::Corrupted, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), ServerResponseStatus::hdr, ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, XrdCl::Status::IsOK(), XrdProto::kXR_PartialResult, kXR_pgread, kXR_pgwrite, kXR_status, XrdCl::MsgHandler::More, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseBody_Status::resptype, ServerResponseHeader::status, ServerResponseV2::status, XrdCl::XRootDTransport::UnMarshalStatusBody(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ IsRaw()

bool XrdCl::XRootDMsgHandler::IsRaw ( ) const
virtual

Are we a raw writer or not?

Reimplemented from XrdCl::MsgHandler.

Definition at line 927 of file XrdClXRootDMsgHandler.cc.

928  {
929  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
930  uint16_t reqId = ntohs( req->header.requestid );
931  if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
932  return true;
933  // checkpoint + execute
934  if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
935  {
936  ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
937  reqId = ntohs( xeq->header.requestid );
938  return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
939  }
940 
941  return false;
942  }
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_chkpoint
Definition: XProtocol.hh:124
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:847

References ClientRequest::chkpoint, XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_chkpoint, kXR_ckpXeq, kXR_pgwrite, kXR_truncate, kXR_write, kXR_writev, ClientChkPointRequest::opcode, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ OnStatusReady()

void XrdCl::XRootDMsgHandler::OnStatusReady ( const Message message,
XRootDStatus  status 
)
virtual

The requested action has been performed and the status is available.

Implements XrdCl::MsgHandler.

Definition at line 894 of file XrdClXRootDMsgHandler.cc.

896  {
897  Log *log = DefaultEnv::GetLog();
898 
899  //--------------------------------------------------------------------------
900  // We were successful, so we now need to listen for a response
901  //--------------------------------------------------------------------------
902  if( status.IsOK() )
903  {
904  log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
905  pUrl.GetHostId().c_str(), message->GetDescription().c_str() );
906 
907  log->Debug( ExDbgMsg, "[%s] Moving MsgHandler: 0x%x (message: %s ) from out-queue to in-queue.",
908  pUrl.GetHostId().c_str(), this,
909  pRequest->GetDescription().c_str() );
910 
911  pMsgInFly = true;
912  return;
913  }
914 
915  //--------------------------------------------------------------------------
916  // We have failed, recover if possible
917  //--------------------------------------------------------------------------
918  log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
919  "recover.", pUrl.GetHostId().c_str(),
920  message->GetDescription().c_str() );
921  HandleError( status );
922  }

References XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::ExDbgMsg, XrdCl::Message::GetDescription(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ OnStreamEvent()

uint8_t XrdCl::XRootDMsgHandler::OnStreamEvent ( StreamEvent  event,
XRootDStatus  status 
)
virtual

Handle an event other that a message arrival

Parameters
eventtype of the event
statusstatus info

Reimplemented from XrdCl::MsgHandler.

Definition at line 857 of file XrdClXRootDMsgHandler.cc.

859  {
860  Log *log = DefaultEnv::GetLog();
861  log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
862  pUrl.GetHostId().c_str(), pRequest->GetDescription().c_str() );
863 
864  if( event == Ready )
865  return 0;
866 
867  if( pTimeoutFence.load( std::memory_order_relaxed ) )
868  return 0;
869 
870  HandleError( status );
871  return RemoveHandler;
872  }
@ Ready
The stream has become connected.

References XrdCl::Log::Dump(), XrdCl::Message::GetDescription(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::MsgHandler::Ready, XrdCl::MsgHandler::RemoveHandler, and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ PartialReceived()

void XrdCl::XRootDMsgHandler::PartialReceived ( )

Bookkeeping after partial response has been received:

  • take down the timeout fence after oksofar response has been handled
  • reset status-response-body marshaled flag

Definition at line 1104 of file XrdClXRootDMsgHandler.cc.

1105  {
1106  pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1107  }

Referenced by XrdCl::Stream::ForceError(), XrdCl::Stream::OnError(), and XrdCl::Stream::OnIncoming().

+ Here is the caller graph for this function:

◆ Process()

void XrdCl::XRootDMsgHandler::Process ( )
virtual

Process the message if it was "taken" by the examine action.

Process the message if it was "taken" by the examine action

Parameters
msgthe message to be processed

Reimplemented from XrdCl::MsgHandler.

Definition at line 396 of file XrdClXRootDMsgHandler.cc.

397  {
398  Log *log = DefaultEnv::GetLog();
399 
400  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
401 
402  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
403 
404  //--------------------------------------------------------------------------
405  // If it is a local file, it can be only a metalink redirector
406  //--------------------------------------------------------------------------
407  if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
408  pHosts->back().protocol = kXR_PROTOCOLVERSION;
409 
410  //--------------------------------------------------------------------------
411  // We got an answer, check who we were talking to
412  //--------------------------------------------------------------------------
413  else
414  {
415  AnyObject qryResult;
416  int *qryResponse = 0;
417  pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
418  qryResult.Get( qryResponse );
419  pHosts->back().flags = *qryResponse; delete qryResponse; qryResponse = 0;
420  pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
421  qryResult.Get( qryResponse );
422  pHosts->back().protocol = *qryResponse; delete qryResponse;
423  }
424 
425  //--------------------------------------------------------------------------
426  // Process the message
427  //--------------------------------------------------------------------------
428  Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
429  if( !st.IsOK() )
430  {
431  pStatus = Status( stFatal, errInvalidMessage );
432  HandleResponse();
433  return;
434  }
435 
436  //--------------------------------------------------------------------------
437  // we have an response for the message so it's not in fly anymore
438  //--------------------------------------------------------------------------
439  pMsgInFly = false;
440 
441  //--------------------------------------------------------------------------
442  // Reset the aggregated wait (used to omit wait response in case of Metalink
443  // redirector)
444  //--------------------------------------------------------------------------
445  if( rsp->hdr.status != kXR_wait )
446  pAggregatedWaitTime = 0;
447 
448  switch( rsp->hdr.status )
449  {
450  //------------------------------------------------------------------------
451  // kXR_ok - we're done here
452  //------------------------------------------------------------------------
453  case kXR_ok:
454  {
455  log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
456  pUrl.GetHostId().c_str(),
457  pRequest->GetDescription().c_str() );
458  pStatus = Status();
459  HandleResponse();
460  return;
461  }
462 
463  case kXR_status:
464  {
465  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
466  pUrl.GetHostId().c_str(),
467  pRequest->GetDescription().c_str() );
468  pStatus = Status();
469  HandleResponse();
470  return;
471  }
472 
473  //------------------------------------------------------------------------
474  // kXR_ok - we're serving partial result to the user
475  //------------------------------------------------------------------------
476  case kXR_oksofar:
477  {
478  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
479  pUrl.GetHostId().c_str(),
480  pRequest->GetDescription().c_str() );
481  pStatus = Status( stOK, suContinue );
482  HandleResponse();
483  return;
484  }
485 
486  //------------------------------------------------------------------------
487  // kXR_error - we've got a problem
488  //------------------------------------------------------------------------
489  case kXR_error:
490  {
491  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
492  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
493  log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
494  "[%d] %s", pUrl.GetHostId().c_str(),
495  pRequest->GetDescription().c_str(), rsp->body.error.errnum,
496  errmsg );
497  delete [] errmsg;
498 
499  HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
500  return;
501  }
502 
503  //------------------------------------------------------------------------
504  // kXR_redirect - they tell us to go elsewhere
505  //------------------------------------------------------------------------
506  case kXR_redirect:
507  {
508  if( rsp->hdr.dlen <= 4 )
509  {
510  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
511  pUrl.GetHostId().c_str() );
512  pStatus = Status( stError, errInvalidResponse );
513  HandleResponse();
514  return;
515  }
516 
517  char *urlInfoBuff = new char[rsp->hdr.dlen-3];
518  urlInfoBuff[rsp->hdr.dlen-4] = 0;
519  memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
520  std::string urlInfo = urlInfoBuff;
521  delete [] urlInfoBuff;
522  log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
523  "message %s: %s, port %d", pUrl.GetHostId().c_str(),
524  pRequest->GetDescription().c_str(), urlInfo.c_str(),
525  rsp->body.redirect.port );
526 
527  //----------------------------------------------------------------------
528  // Check if we can proceed
529  //----------------------------------------------------------------------
530  if( !pRedirectCounter )
531  {
532  log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
533  "message %s, the last known error is: %s",
534  pUrl.GetHostId().c_str(),
535  pRequest->GetDescription().c_str(),
536  pLastError.ToString().c_str() );
537 
538 
539  pStatus = Status( stFatal, errRedirectLimit );
540  HandleResponse();
541  return;
542  }
543  --pRedirectCounter;
544 
545  //----------------------------------------------------------------------
546  // Keep the info about this server if we still need to find a load
547  // balancer
548  //----------------------------------------------------------------------
549  uint32_t flags = pHosts->back().flags;
550  if( !pHasLoadBalancer )
551  {
552  if( flags & kXR_isManager )
553  {
554  //------------------------------------------------------------------
555  // If the current server is a meta manager then it supersedes
556  // any existing load balancer, otherwise we assign a load-balancer
557  // only if it has not been already assigned
558  //------------------------------------------------------------------
559  if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
560  {
561  pLoadBalancer = pHosts->back();
562  log->Dump( XRootDMsg, "[%s] Current server has been assigned "
563  "as a load-balancer for message %s",
564  pUrl.GetHostId().c_str(),
565  pRequest->GetDescription().c_str() );
566  HostList::iterator it;
567  for( it = pHosts->begin(); it != pHosts->end(); ++it )
568  it->loadBalancer = false;
569  pHosts->back().loadBalancer = true;
570  }
571  }
572  }
573 
574  //----------------------------------------------------------------------
575  // If the redirect comes from a data server safe the URL because
576  // in case of a failure we will use it as the effective data server URL
577  // for the tried CGI opaque info
578  //----------------------------------------------------------------------
579  if( flags & kXR_isServer )
580  pEffectiveDataServerUrl = new URL( pHosts->back().url );
581 
582  //----------------------------------------------------------------------
583  // Build the URL and check it's validity
584  //----------------------------------------------------------------------
585  std::vector<std::string> urlComponents;
586  std::string newCgi;
587  Utils::splitString( urlComponents, urlInfo, "?" );
588 
589  std::ostringstream o;
590 
591  o << urlComponents[0];
592  if( rsp->body.redirect.port > 0 )
593  o << ":" << rsp->body.redirect.port << "/";
594  else if( rsp->body.redirect.port < 0 )
595  {
596  //--------------------------------------------------------------------
597  // check if the manager wants to enforce write recovery at himself
598  // (beware we are dealing here with negative flags)
599  //--------------------------------------------------------------------
600  if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
601  pHosts->back().flags |= kXR_recoverWrts;
602 
603  //--------------------------------------------------------------------
604  // check if the manager wants to collapse the communication channel
605  // (the redirect host is to replace the current host)
606  //--------------------------------------------------------------------
607  if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
608  {
609  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
610  pPostMaster->CollapseRedirect( pUrl, url );
611  }
612 
613  if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
614  {
615  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
616  if( Utils::CheckEC( pRequest, url ) )
617  pRedirectAsAnswer = true;
618  }
619  }
620 
621  URL newUrl = URL( o.str() );
622  if( !newUrl.IsValid() )
623  {
624  pStatus = Status( stError, errInvalidRedirectURL );
625  log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
626  pUrl.GetHostId().c_str(), urlInfo.c_str() );
627  HandleResponse();
628  return;
629  }
630 
631  if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
632  newUrl.SetUserName( pUrl.GetUserName() );
633 
634  if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
635  newUrl.SetPassword( pUrl.GetPassword() );
636 
637  //----------------------------------------------------------------------
638  // Forward any "xrd.*" params from the original client request also to
639  // the new redirection url
640  // Also, we need to preserve any "xrdcl.*' as they are important for
641  // our internal workflows.
642  //----------------------------------------------------------------------
643  std::ostringstream ossXrd;
644  const URL::ParamsMap &urlParams = pUrl.GetParams();
645 
646  for(URL::ParamsMap::const_iterator it = urlParams.begin();
647  it != urlParams.end(); ++it )
648  {
649  if( it->first.compare( 0, 4, "xrd." ) &&
650  it->first.compare( 0, 6, "xrdcl." ) )
651  continue;
652 
653  ossXrd << it->first << '=' << it->second << '&';
654  }
655 
656  std::string xrdCgi = ossXrd.str();
657  pRedirectUrl = newUrl.GetURL();
658 
659  URL cgiURL;
660  if( urlComponents.size() > 1 )
661  {
662  pRedirectUrl += "?";
663  pRedirectUrl += urlComponents[1];
664  std::ostringstream o;
665  o << "fake://fake:111//fake?";
666  o << urlComponents[1];
667 
668  if( urlComponents.size() == 3 )
669  o << '?' << urlComponents[2];
670 
671  if (!xrdCgi.empty())
672  {
673  o << '&' << xrdCgi;
674  pRedirectUrl += '&';
675  pRedirectUrl += xrdCgi;
676  }
677 
678  cgiURL = URL( o.str() );
679  }
680  else {
681  if (!xrdCgi.empty())
682  {
683  std::ostringstream o;
684  o << "fake://fake:111//fake?";
685  o << xrdCgi;
686  cgiURL = URL( o.str() );
687  pRedirectUrl += '?';
688  pRedirectUrl += xrdCgi;
689  }
690  }
691 
692  //----------------------------------------------------------------------
693  // Check if we need to return the URL as a response
694  //----------------------------------------------------------------------
695  if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
696  newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
697  !newUrl.IsLocalFile() )
698  pRedirectAsAnswer = true;
699 
700  if( pRedirectAsAnswer )
701  {
702  pStatus = Status( stError, errRedirect );
703  HandleResponse();
704  return;
705  }
706 
707  //----------------------------------------------------------------------
708  // Rewrite the message in a way required to send it to another server
709  //----------------------------------------------------------------------
710  newUrl.SetParams( cgiURL.GetParams() );
711  Status st = RewriteRequestRedirect( newUrl );
712  if( !st.IsOK() )
713  {
714  pStatus = st;
715  HandleResponse();
716  return;
717  }
718 
719  //----------------------------------------------------------------------
720  // Make sure we don't change the protocol by accident (root vs roots)
721  //----------------------------------------------------------------------
722  if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
723  ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
724  newUrl.SetProtocol( "roots" );
725 
726  //----------------------------------------------------------------------
727  // Send the request to the new location
728  //----------------------------------------------------------------------
729  HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
730  return;
731  }
732 
733  //------------------------------------------------------------------------
734  // kXR_wait - we wait, and re-issue the request later
735  //------------------------------------------------------------------------
736  case kXR_wait:
737  {
738  uint32_t waitSeconds = 0;
739 
740  if( rsp->hdr.dlen >= 4 )
741  {
742  char *infoMsg = new char[rsp->hdr.dlen-3];
743  infoMsg[rsp->hdr.dlen-4] = 0;
744  memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
745  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
746  "message %s: %s", pUrl.GetHostId().c_str(),
747  rsp->body.wait.seconds, pRequest->GetDescription().c_str(),
748  infoMsg );
749  delete [] infoMsg;
750  waitSeconds = rsp->body.wait.seconds;
751  }
752  else
753  {
754  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
755  "message %s", pUrl.GetHostId().c_str(),
756  pRequest->GetDescription().c_str() );
757  }
758 
759  pAggregatedWaitTime += waitSeconds;
760 
761  // We need a special case if the data node comes from metalink
762  // redirector. In this case it might make more sense to try the
763  // next entry in the Metalink than wait.
764  if( OmitWait( *pRequest, pLoadBalancer.url ) )
765  {
766  int maxWait = DefaultMaxMetalinkWait;
767  DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
768  if( pAggregatedWaitTime > maxWait )
769  {
770  UpdateTriedCGI();
771  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
772  return;
773  }
774  }
775 
776  //----------------------------------------------------------------------
777  // Some messages require rewriting before they can be sent again
778  // after wait
779  //----------------------------------------------------------------------
780  Status st = RewriteRequestWait();
781  if( !st.IsOK() )
782  {
783  pStatus = st;
784  HandleResponse();
785  return;
786  }
787 
788  //----------------------------------------------------------------------
789  // Register a task to resend the message in some seconds, if we still
790  // have time to do that, and report a timeout otherwise
791  //----------------------------------------------------------------------
792  time_t resendTime = ::time(0)+waitSeconds;
793 
794  if( resendTime < pExpiration )
795  {
796  log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: 0x%x (message: %s ).",
797  pUrl.GetHostId().c_str(), this,
798  pRequest->GetDescription().c_str() );
799 
800  TaskManager *taskMgr = pPostMaster->GetTaskManager();
801  taskMgr->RegisterTask( new WaitTask( this ), resendTime );
802  }
803  else
804  {
805  log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
806  pUrl.GetHostId().c_str(),
807  pRequest->GetDescription().c_str() );
808  HandleError( Status( stError, errOperationExpired) );
809  }
810  return;
811  }
812 
813  //------------------------------------------------------------------------
814  // kXR_waitresp - the response will be returned in some seconds as an
815  // unsolicited message. Currently all messages of this type are handled
816  // one step before in the XrdClStream::OnIncoming as they need to be
817  // processed synchronously.
818  //------------------------------------------------------------------------
819  case kXR_waitresp:
820  {
821  if( rsp->hdr.dlen < 4 )
822  {
823  log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
824  pUrl.GetHostId().c_str() );
825  pStatus = Status( stError, errInvalidResponse );
826  HandleResponse();
827  return;
828  }
829 
830  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
831  "message %s", pUrl.GetHostId().c_str(),
832  rsp->body.waitresp.seconds,
833  pRequest->GetDescription().c_str() );
834  return;
835  }
836 
837  //------------------------------------------------------------------------
838  // Default - unrecognized/unsupported response, declare an error
839  //------------------------------------------------------------------------
840  default:
841  {
842  log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
843  "message %s", pUrl.GetHostId().c_str(),
844  rsp->hdr.status, pRequest->GetDescription().c_str() );
845  pStatus = Status( stError, errInvalidResponse );
846  HandleResponse();
847  return;
848  }
849  }
850 
851  return;
852  }
#define kXR_isManager
Definition: XProtocol.hh:1154
union ServerResponse::@0 body
#define kXR_collapseRedir
Definition: XProtocol.hh:1164
#define kXR_attrMeta
Definition: XProtocol.hh:1156
#define kXR_recoverWrts
Definition: XProtocol.hh:1163
#define kXR_isServer
Definition: XProtocol.hh:1155
#define kXR_PROTOCOLVERSION
Definition: XProtocol.hh:70
#define kXR_ecRedir
Definition: XProtocol.hh:1165
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
void RegisterTask(Task *task, time_t time, bool own=true)
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:451
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:113
const std::string & GetUserName() const
Get the username.
Definition: XrdClURL.hh:130
const std::string & GetPassword() const
Get the password.
Definition: XrdClURL.hh:148
bool IsLocalFile() const
Definition: XrdClURL.cc:460
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:239
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:438
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Definition: XrdClUtils.cc:700
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
const uint16_t errRedirectLimit
Definition: XrdClStatus.hh:102
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
const uint16_t errInvalidRedirectURL
Definition: XrdClStatus.hh:98
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85
URL url
URL of the host.
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version

References ServerResponse::body, XrdCl::Utils::CheckEC(), XrdCl::PostMaster::CollapseRedirect(), XrdCl::Log::Debug(), XrdCl::DefaultMaxMetalinkWait, ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::RedirectEntry::EntryRedirect, XrdCl::RedirectEntry::EntryRedirectOnWait, XrdCl::errErrorResponse, XrdCl::errInvalidMessage, XrdCl::errInvalidRedirectURL, XrdCl::errInvalidResponse, XrdCl::errOperationExpired, XrdCl::Log::Error(), XrdCl::errRedirect, XrdCl::errRedirectLimit, XrdCl::ExDbgMsg, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::Message::GetDescription(), XrdCl::DefaultEnv::GetEnv(), XrdCl::URL::GetHostId(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::URL::GetParams(), XrdCl::URL::GetPassword(), XrdCl::URL::GetProtocol(), XrdCl::PostMaster::GetTaskManager(), XrdCl::URL::GetURL(), XrdCl::URL::GetUserName(), ServerResponse::hdr, ClientRequest::header, XrdCl::URL::IsLocalFile(), XrdCl::URL::IsMetalink(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), kXR_attrMeta, kXR_collapseRedir, kXR_ecRedir, kXR_error, kXR_isManager, kXR_isServer, kXR_ok, kXR_oksofar, kXR_PROTOCOLVERSION, kXR_recoverWrts, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::PostMaster::QueryTransport(), XrdCl::TaskManager::RegisterTask(), ClientRequestHdr::requestid, XrdCl::XRootDQuery::ServerFlags, XrdCl::URL::SetParams(), XrdCl::URL::SetPassword(), XrdCl::URL::SetProtocol(), XrdCl::URL::SetUserName(), XrdCl::Utils::splitString(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stFatal, XrdCl::stOK, XrdCl::suContinue, XrdCl::Status::ToString(), XrdCl::XRootDTransport::UnMarshallBody(), XrdCl::HostInfo::url, XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ ReadMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::ReadMessageBody ( Message msg,
Socket socket,
uint32_t &  bytesRead 
)
virtual

Read message body directly from a socket - called if Examine returns Raw flag - only socket related errors may be returned here

Parameters
msgthe corresponding message header
socketthe socket to read from
bytesReadnumber of bytes read by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data is needed stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 877 of file XrdClXRootDMsgHandler.cc.

880  {
881  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
882  uint16_t reqId = ntohs( req->header.requestid );
883 
884  if( reqId == kXR_pgread )
885  return pPageReader->Read( *socket, bytesRead );
886 
887  return pBodyReader->Read( *socket, bytesRead );
888  }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_pgread, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ SetChunkList()

void XrdCl::XRootDMsgHandler::SetChunkList ( ChunkList chunkList)
inline

Set the chunk list.

Definition at line 382 of file XrdClXRootDMsgHandler.hh.

383  {
384  pChunkList = chunkList;
385  if( pBodyReader )
386  pBodyReader->SetChunkList( chunkList );
387  if( chunkList )
388  pChunkStatus.resize( chunkList->size() );
389  else
390  pChunkStatus.clear();
391  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetCrc32cDigests()

void XrdCl::XRootDMsgHandler::SetCrc32cDigests ( std::vector< uint32_t > &&  crc32cDigests)
inline

Definition at line 393 of file XrdClXRootDMsgHandler.hh.

394  {
395  pCrc32cDigests = std::move( crc32cDigests );
396  }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetExpiration()

void XrdCl::XRootDMsgHandler::SetExpiration ( time_t  expiration)
inline

Set a timestamp after which we give up.

Definition at line 321 of file XrdClXRootDMsgHandler.hh.

322  {
323  pExpiration = expiration;
324  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetFollowMetalink()

void XrdCl::XRootDMsgHandler::SetFollowMetalink ( bool  followMetalink)
inline

Definition at line 414 of file XrdClXRootDMsgHandler.hh.

415  {
416  pFollowMetalink = followMetalink;
417  }

Referenced by XrdCl::MessageUtils::RedirectMessage().

+ Here is the caller graph for this function:

◆ SetHostList()

void XrdCl::XRootDMsgHandler::SetHostList ( HostList hostList)
inline

Set host list.

Definition at line 374 of file XrdClXRootDMsgHandler.hh.

375  {
376  pHosts.reset( hostList );
377  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetKernelBuffer()

void XrdCl::XRootDMsgHandler::SetKernelBuffer ( XrdSys::KernelBuffer kbuff)
inline

Set the kernel buffer.

Definition at line 401 of file XrdClXRootDMsgHandler.hh.

402  {
403  pKBuff = kbuff;
404  }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetLoadBalancer()

void XrdCl::XRootDMsgHandler::SetLoadBalancer ( const HostInfo loadBalancer)
inline

Set the load balancer.

Definition at line 363 of file XrdClXRootDMsgHandler.hh.

364  {
365  if( !loadBalancer.url.IsValid() )
366  return;
367  pLoadBalancer = loadBalancer;
368  pHasLoadBalancer = true;
369  }

References XrdCl::URL::IsValid(), and XrdCl::HostInfo::url.

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetOksofarAsAnswer()

void XrdCl::XRootDMsgHandler::SetOksofarAsAnswer ( bool  oksofarAsAnswer)
inline

Treat the kXR_oksofar response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 347 of file XrdClXRootDMsgHandler.hh.

348  {
349  pOksofarAsAnswer = oksofarAsAnswer;
350  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectAsAnswer()

void XrdCl::XRootDMsgHandler::SetRedirectAsAnswer ( bool  redirectAsAnswer)
inline

Treat the kXR_redirect response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 338 of file XrdClXRootDMsgHandler.hh.

339  {
340  pRedirectAsAnswer = redirectAsAnswer;
341  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectCounter()

void XrdCl::XRootDMsgHandler::SetRedirectCounter ( uint16_t  redirectCounter)
inline

Set the redirect counter.

Definition at line 409 of file XrdClXRootDMsgHandler.hh.

410  {
411  pRedirectCounter = redirectCounter;
412  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetStateful()

void XrdCl::XRootDMsgHandler::SetStateful ( bool  stateful)
inline

Definition at line 419 of file XrdClXRootDMsgHandler.hh.

420  {
421  pStateful = stateful;
422  }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ WaitDone()

void XrdCl::XRootDMsgHandler::WaitDone ( time_t  now)

Called after the wait time for kXR_wait has elapsed

Parameters
nowcurrent timestamp

Definition at line 1096 of file XrdClXRootDMsgHandler.cc.

1097  {
1098  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1099  }

References XrdCl::RedirectEntry::EntryWait.

◆ WriteMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::WriteMessageBody ( Socket socket,
uint32_t &  bytesWritten 
)
virtual

Write message body directly to a socket - called if IsRaw returns true - only socket related errors may be returned here

Parameters
socketthe socket to read from
bytesWrittennumber of bytes written by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data needs to be written stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 947 of file XrdClXRootDMsgHandler.cc.

949  {
950  //--------------------------------------------------------------------------
951  // First check if it is a PgWrite
952  //--------------------------------------------------------------------------
953  if( !pChunkList->empty() && !pCrc32cDigests.empty() )
954  {
955  //------------------------------------------------------------------------
956  // PgWrite will have just one chunk
957  //------------------------------------------------------------------------
958  ChunkInfo chunk = pChunkList->front();
959  //------------------------------------------------------------------------
960  // Calculate the size of the first and last page (in case the chunk is not
961  // 4KB aligned)
962  //------------------------------------------------------------------------
963  int fLen = 0, lLen = 0;
964  size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
965 
966  //------------------------------------------------------------------------
967  // Set the crc32c buffer if not ready yet
968  //------------------------------------------------------------------------
969  if( pPgWrtCksumBuff.GetCursor() == 0 )
970  {
971  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
972  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
973  }
974 
975  uint32_t btsLeft = chunk.length - pAsyncOffset;
976  uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
977  if( pglen > btsLeft ) pglen = btsLeft;
978  char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
979 
980  while( btsLeft > 0 )
981  {
982  // first write the crc32c digest
983  while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
984  {
985  uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
986  char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
987  int btswrt = 0;
988  Status st = socket->Send( dgstbuf, dgstlen, btswrt );
989  if( !st.IsOK() ) return st;
990  bytesWritten += btswrt;
991  pPgWrtCksumBuff.AdvanceCursor( btswrt );
992  if( st.code == suRetry ) return st;
993  }
994  // then write the raw data (one page)
995  int btswrt = 0;
996  Status st = socket->Send( pgbuf, pglen, btswrt );
997  if( !st.IsOK() ) return st;
998  pgbuf += btswrt;
999  pglen -= btswrt;
1000  btsLeft -= btswrt;
1001  bytesWritten += btswrt;
1002  pAsyncOffset += btswrt; // update the offset to the raw data
1003  if( st.code == suRetry ) return st;
1004  // if we managed to write all the data ...
1005  if( pglen == 0 )
1006  {
1007  // move to the next page
1008  ++pPgWrtCurrentPageNb;
1009  if( pPgWrtCurrentPageNb < nbpgs )
1010  {
1011  // set the digest buffer
1012  pPgWrtCksumBuff.SetCursor( 0 );
1013  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1014  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1015  }
1016  // set the page length
1017  pglen = XrdSys::PageSize;
1018  if( pglen > btsLeft ) pglen = btsLeft;
1019  // reset offset in the current page
1020  pPgWrtCurrentPageOffset = 0;
1021  }
1022  else
1023  // otherwise just adjust the offset in the current page
1024  pPgWrtCurrentPageOffset += btswrt;
1025 
1026  }
1027  }
1028  else if( !pChunkList->empty() )
1029  {
1030  size_t size = pChunkList->size();
1031  for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1032  {
1033  char *buffer = (char*)(*pChunkList)[i].buffer;
1034  uint32_t size = (*pChunkList)[i].length;
1035  size_t leftToBeWritten = size - pAsyncOffset;
1036 
1037  while( leftToBeWritten )
1038  {
1039  int btswrt = 0;
1040  Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1041  bytesWritten += btswrt;
1042  if( !st.IsOK() || st.code == suRetry ) return st;
1043  pAsyncOffset += btswrt;
1044  leftToBeWritten -= btswrt;
1045  }
1046  //----------------------------------------------------------------------
1047  // Remember that we have moved to the next chunk, also clear the offset
1048  // within the buffer as we are going to move to a new one
1049  //----------------------------------------------------------------------
1050  ++pAsyncChunkIndex;
1051  pAsyncOffset = 0;
1052  }
1053  }
1054  else
1055  {
1056  Log *log = DefaultEnv::GetLog();
1057 
1058  //------------------------------------------------------------------------
1059  // If the socket is encrypted we cannot use a kernel buffer, we have to
1060  // convert to user space buffer
1061  //------------------------------------------------------------------------
1062  if( socket->IsEncrypted() )
1063  {
1064  log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1065  pUrl.GetHostId().c_str() );
1066 
1067  char *ubuff = 0;
1068  ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1069  if( ret < 0 ) return Status( stError, errInternal );
1070  pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1071  return WriteMessageBody( socket, bytesWritten );
1072  }
1073 
1074  //------------------------------------------------------------------------
1075  // Send the data
1076  //------------------------------------------------------------------------
1077  while( !pKBuff->Empty() )
1078  {
1079  int btswrt = 0;
1080  Status st = socket->Send( *pKBuff, btswrt );
1081  bytesWritten += btswrt;
1082  if( !st.IsOK() || st.code == suRetry ) return st;
1083  }
1084 
1085  log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1086  pUrl.GetHostId().c_str(), pRequest->GetDescription().c_str() );
1087  }
1088 
1089  return Status();
1090  }
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition: XrdClBuffer.hh:156
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
uint32_t GetCursor() const
Get append cursor.
Definition: XrdClBuffer.hh:140
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition: XrdClBuffer.hh:189
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)

References XrdCl::Buffer::AdvanceCursor(), XrdCl::ChunkInfo::buffer, XrdCl::Status::code, XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdSys::KernelBuffer::Empty(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::Message::GetDescription(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::IsEncrypted(), XrdCl::Status::IsOK(), XrdCl::ChunkInfo::length, XrdSys::Move(), XrdCl::ChunkInfo::offset, XrdSys::PageSize, XrdCl::Socket::Send(), XrdCl::Buffer::SetCursor(), XrdCl::stError, XrdCl::suRetry, and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

Friends And Related Function Documentation

◆ HandleRspJob

friend class HandleRspJob
friend

Definition at line 120 of file XrdClXRootDMsgHandler.hh.


The documentation for this class was generated from the following files: