XRootD
XrdCl::XRootDTransport Class Reference

XRootD transport handler. More...

#include <XrdClXRootDTransport.hh>

+ Inheritance diagram for XrdCl::XRootDTransport:
+ Collaboration diagram for XrdCl::XRootDTransport:

Public Member Functions

 XRootDTransport ()
 Constructor. More...
 
 ~XRootDTransport ()
 Destructor. More...
 
virtual void DecFileInstCnt (AnyObject &channelData)
 Decrement file object instance count bound to this channel. More...
 
virtual void Disconnect (AnyObject &channelData, uint16_t subStreamId)
 The stream has been disconnected, do the cleanups. More...
 
virtual void FinalizeChannel (AnyObject &channelData)
 Finalize channel. More...
 
virtual URL GetBindPreference (const URL &url, AnyObject &channelData)
 Get bind preference for the next data stream. More...
 
virtual XRootDStatus GetBody (Message &message, Socket *socket)
 
virtual XRootDStatus GetHeader (Message &message, Socket *socket)
 
virtual XRootDStatus GetMore (Message &message, Socket *socket)
 
virtual Status GetSignature (Message *toSign, Message *&sign, AnyObject &channelData)
 Get signature for given message. More...
 
virtual Status GetSignature (Message *toSign, Message *&sign, XRootDChannelInfo *info)
 Get signature for given message. More...
 
virtual XRootDStatus HandShake (HandShakeData *handShakeData, AnyObject &channelData)
 HandShake. More...
 
virtual bool HandShakeDone (HandShakeData *handShakeData, AnyObject &channelData)
 
virtual void InitializeChannel (const URL &url, AnyObject &channelData)
 Initialize channel. More...
 
virtual Status IsStreamBroken (time_t inactiveTime, AnyObject &channelData)
 
virtual bool IsStreamTTLElapsed (time_t time, AnyObject &channelData)
 Check if the stream should be disconnected. More...
 
virtual uint32_t MessageReceived (Message &msg, uint16_t subStream, AnyObject &channelData)
 Check if the message invokes a stream action. More...
 
virtual void MessageSent (Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
 Notify the transport about a message having been sent. More...
 
virtual PathID Multiplex (Message *msg, AnyObject &channelData, PathID *hint=0)
 
virtual PathID MultiplexSubStream (Message *msg, AnyObject &channelData, PathID *hint=0)
 
virtual bool NeedControlConnection ()
 
virtual bool NeedEncryption (HandShakeData *handShakeData, AnyObject &channelData)
 
virtual Status Query (uint16_t query, AnyObject &result, AnyObject &channelData)
 Query the channel. More...
 
virtual uint16_t SubStreamNumber (AnyObject &channelData)
 Return a number of substreams per stream that should be created. More...
 
virtual void WaitBeforeExit ()
 Wait until the program can safely exit. More...
 
- Public Member Functions inherited from XrdCl::TransportHandler
virtual ~TransportHandler ()
 

Static Public Member Functions

static void GenerateDescription (char *msg, std::ostringstream &o)
 Get the description of a message. More...
 
static void LogErrorResponse (const Message &msg)
 Log server error response. More...
 
static XRootDStatus MarshallRequest (char *msg)
 Marshal the outgoing message. More...
 
static XRootDStatus MarshallRequest (Message *msg)
 Marshal the outgoing message. More...
 
static uint16_t NbConnectedStrm (AnyObject &channelData)
 Number of currently connected data streams. More...
 
static void SetDescription (Message *msg)
 Get the description of a message. More...
 
static XRootDStatus UnMarchalStatusMore (Message &msg)
 Unmarshall the correction-segment of the status response for pgwrite. More...
 
static XRootDStatus UnMarshallBody (Message *msg, uint16_t reqType)
 Unmarshall the body of the incoming message. More...
 
static void UnMarshallHeader (Message &msg)
 Unmarshall the header incoming message. More...
 
static XRootDStatus UnMarshallRequest (Message *msg)
 
static XRootDStatus UnMarshalStatusBody (Message &msg, uint16_t reqType)
 Unmarshall the body of the status response. More...
 

Friends

struct PluginUnloadHandler
 

Additional Inherited Members

- Public Types inherited from XrdCl::TransportHandler
enum  StreamAction {
  NoAction = 0x0000 ,
  DigestMsg = 0x0001 ,
  AbortStream = 0x0002 ,
  CloseStream = 0x0004 ,
  ResumeStream = 0x0008 ,
  HoldStream = 0x0010 ,
  RequestClose = 0x0020
}
 Stream actions that may be triggered by incoming control messages. More...
 

Detailed Description

XRootD transport handler.

Definition at line 47 of file XrdClXRootDTransport.hh.

Constructor & Destructor Documentation

◆ XRootDTransport()

XrdCl::XRootDTransport::XRootDTransport ( )

Constructor.

Definition at line 291 of file XrdClXRootDTransport.cc.

291  :
292  pSecUnloadHandler( new PluginUnloadHandler() )
293  {
294  }

◆ ~XRootDTransport()

XrdCl::XRootDTransport::~XRootDTransport ( )

Destructor.

Definition at line 299 of file XrdClXRootDTransport.cc.

300  {
301  delete pSecUnloadHandler; pSecUnloadHandler = 0;
302  }

Member Function Documentation

◆ DecFileInstCnt()

void XrdCl::XRootDTransport::DecFileInstCnt ( AnyObject channelData)
virtual

Decrement file object instance count bound to this channel.

Implements XrdCl::TransportHandler.

Definition at line 1812 of file XrdClXRootDTransport.cc.

1813  {
1814  XRootDChannelInfo *info = 0;
1815  channelData.Get( info );
1816  if( info->finstcnt.load( std::memory_order_relaxed ) > 0 )
1817  info->finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1818  }

References XrdCl::XRootDChannelInfo::finstcnt, and XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ Disconnect()

void XrdCl::XRootDTransport::Disconnect ( AnyObject channelData,
uint16_t  subStreamId 
)
virtual

The stream has been disconnected, do the cleanups.

Implements XrdCl::TransportHandler.

Definition at line 1544 of file XrdClXRootDTransport.cc.

1546  {
1547  XRootDChannelInfo *info = 0;
1548  channelData.Get( info );
1549 
1550  if (!info) {
1551  DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1552  return;
1553  }
1554 
1555  XrdSysMutexHelper scopedLock( info->mutex );
1556 
1557  CleanUpProtection( info );
1558 
1559  if( !info->stream.empty() )
1560  {
1561  XRootDStreamInfo &sInfo = info->stream[subStreamId];
1562  sInfo.status = XRootDStreamInfo::Disconnected;
1563  }
1564 
1565  if( subStreamId == 0 )
1566  {
1567  info->sidManager->ReleaseAllTimedOut();
1568  info->sentOpens.clear();
1569  info->sentCloses.clear();
1570  info->openFiles = 0;
1571  info->waitBarrier = 0;
1572  }
1573  }
static Log * GetLog()
Get default log.
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
const uint64_t XRootDTransportMsg

References XrdCl::XRootDStreamInfo::Disconnected, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::waitBarrier, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ FinalizeChannel()

void XrdCl::XRootDTransport::FinalizeChannel ( AnyObject channelData)
virtual

Finalize channel.

Implements XrdCl::TransportHandler.

Definition at line 460 of file XrdClXRootDTransport.cc.

461  {
462  }

◆ GenerateDescription()

void XrdCl::XRootDTransport::GenerateDescription ( char *  msg,
std::ostringstream &  o 
)
static

Get the description of a message.

Definition at line 2975 of file XrdClXRootDTransport.cc.

2976  {
2977  Log *log = DefaultEnv::GetLog();
2978  if( log->GetLevel() < Log::ErrorMsg )
2979  return;
2980 
2981  ClientRequestHdr *req = (ClientRequestHdr *)msg;
2982  switch( req->requestid )
2983  {
2984  //------------------------------------------------------------------------
2985  // kXR_open
2986  //------------------------------------------------------------------------
2987  case kXR_open:
2988  {
2989  ClientOpenRequest *sreq = (ClientOpenRequest *)msg;
2990  o << "kXR_open (";
2991  char *fn = GetDataAsString( msg );
2992  o << "file: " << fn << ", ";
2993  delete [] fn;
2994  o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
2995  o << std::setbase(10);
2996  o << "flags: ";
2997  if( sreq->options == 0 )
2998  o << "none";
2999  else
3000  {
3001  if( sreq->options & kXR_compress )
3002  o << "kXR_compress ";
3003  if( sreq->options & kXR_delete )
3004  o << "kXR_delete ";
3005  if( sreq->options & kXR_force )
3006  o << "kXR_force ";
3007  if( sreq->options & kXR_mkpath )
3008  o << "kXR_mkpath ";
3009  if( sreq->options & kXR_new )
3010  o << "kXR_new ";
3011  if( sreq->options & kXR_nowait )
3012  o << "kXR_nowait ";
3013  if( sreq->options & kXR_open_apnd )
3014  o << "kXR_open_apnd ";
3015  if( sreq->options & kXR_open_read )
3016  o << "kXR_open_read ";
3017  if( sreq->options & kXR_open_updt )
3018  o << "kXR_open_updt ";
3019  if( sreq->options & kXR_open_wrto )
3020  o << "kXR_open_wrto ";
3021  if( sreq->options & kXR_posc )
3022  o << "kXR_posc ";
3023  if( sreq->options & kXR_prefname )
3024  o << "kXR_prefname ";
3025  if( sreq->options & kXR_refresh )
3026  o << "kXR_refresh ";
3027  if( sreq->options & kXR_4dirlist )
3028  o << "kXR_4dirlist ";
3029  if( sreq->options & kXR_replica )
3030  o << "kXR_replica ";
3031  if( sreq->options & kXR_seqio )
3032  o << "kXR_seqio ";
3033  if( sreq->options & kXR_async )
3034  o << "kXR_async ";
3035  if( sreq->options & kXR_retstat )
3036  o << "kXR_retstat ";
3037  }
3038  o << ")";
3039  break;
3040  }
3041 
3042  //------------------------------------------------------------------------
3043  // kXR_close
3044  //------------------------------------------------------------------------
3045  case kXR_close:
3046  {
3047  ClientCloseRequest *sreq = (ClientCloseRequest *)msg;
3048  o << "kXR_close (";
3049  o << "handle: " << FileHandleToStr( sreq->fhandle );
3050  o << ")";
3051  break;
3052  }
3053 
3054  //------------------------------------------------------------------------
3055  // kXR_stat
3056  //------------------------------------------------------------------------
3057  case kXR_stat:
3058  {
3059  ClientStatRequest *sreq = (ClientStatRequest *)msg;
3060  o << "kXR_stat (";
3061  if( sreq->dlen )
3062  {
3063  char *fn = GetDataAsString( msg );;
3064  o << "path: " << fn << ", ";
3065  delete [] fn;
3066  }
3067  else
3068  {
3069  o << "handle: " << FileHandleToStr( sreq->fhandle );
3070  o << ", ";
3071  }
3072  o << "flags: ";
3073  if( sreq->options == 0 )
3074  o << "none";
3075  else
3076  {
3077  if( sreq->options & kXR_vfs )
3078  o << "kXR_vfs";
3079  }
3080  o << ")";
3081  break;
3082  }
3083 
3084  //------------------------------------------------------------------------
3085  // kXR_read
3086  //------------------------------------------------------------------------
3087  case kXR_read:
3088  {
3089  ClientReadRequest *sreq = (ClientReadRequest *)msg;
3090  o << "kXR_read (";
3091  o << "handle: " << FileHandleToStr( sreq->fhandle );
3092  o << std::setbase(10);
3093  o << ", ";
3094  o << "offset: " << sreq->offset << ", ";
3095  o << "size: " << sreq->rlen << ")";
3096  break;
3097  }
3098 
3099  //------------------------------------------------------------------------
3100  // kXR_pgread
3101  //------------------------------------------------------------------------
3102  case kXR_pgread:
3103  {
3105  o << "kXR_pgread (";
3106  o << "handle: " << FileHandleToStr( sreq->fhandle );
3107  o << std::setbase(10);
3108  o << ", ";
3109  o << "offset: " << sreq->offset << ", ";
3110  o << "size: " << sreq->rlen << ")";
3111  break;
3112  }
3113 
3114  //------------------------------------------------------------------------
3115  // kXR_write
3116  //------------------------------------------------------------------------
3117  case kXR_write:
3118  {
3119  ClientWriteRequest *sreq = (ClientWriteRequest *)msg;
3120  o << "kXR_write (";
3121  o << "handle: " << FileHandleToStr( sreq->fhandle );
3122  o << std::setbase(10);
3123  o << ", ";
3124  o << "offset: " << sreq->offset << ", ";
3125  o << "size: " << sreq->dlen << ")";
3126  break;
3127  }
3128 
3129  //------------------------------------------------------------------------
3130  // kXR_pgwrite
3131  //------------------------------------------------------------------------
3132  case kXR_pgwrite:
3133  {
3135  o << "kXR_pgwrite (";
3136  o << "handle: " << FileHandleToStr( sreq->fhandle );
3137  o << std::setbase(10);
3138  o << ", ";
3139  o << "offset: " << sreq->offset << ", ";
3140  o << "size: " << sreq->dlen << ")";
3141  break;
3142  }
3143 
3144  //------------------------------------------------------------------------
3145  // kXR_fattr
3146  //------------------------------------------------------------------------
3147  case kXR_fattr:
3148  {
3149  ClientFattrRequest *sreq = (ClientFattrRequest *)msg;
3150  int nattr = sreq->numattr;
3151  int options = sreq->options;
3152  o << "kXR_fattr";
3153  switch (sreq->subcode) {
3154  case kXR_fattrGet:
3155  o << "Get";
3156  break;
3157  case kXR_fattrSet:
3158  o << "Set";
3159  break;
3160  case kXR_fattrList:
3161  o << "List";
3162  break;
3163  case kXR_fattrDel:
3164  o << "Delete";
3165  break;
3166  default:
3167  o << " unknown subcode: " << sreq->subcode;
3168  break;
3169  }
3170  o << " (handle: " << FileHandleToStr( sreq->fhandle );
3171  o << std::setbase(10);
3172  if (nattr)
3173  o << ", numattr: " << nattr;
3174  if (options) {
3175  o << ", options: ";
3176  if (options & 0x01)
3177  o << "new";
3178  if (options & 0x10)
3179  o << "list values";
3180  }
3181  o << ", total size: " << req->dlen << ")";
3182  break;
3183  }
3184 
3185  //------------------------------------------------------------------------
3186  // kXR_sync
3187  //------------------------------------------------------------------------
3188  case kXR_sync:
3189  {
3190  ClientSyncRequest *sreq = (ClientSyncRequest *)msg;
3191  o << "kXR_sync (";
3192  o << "handle: " << FileHandleToStr( sreq->fhandle );
3193  o << ")";
3194  break;
3195  }
3196 
3197  //------------------------------------------------------------------------
3198  // kXR_truncate
3199  //------------------------------------------------------------------------
3200  case kXR_truncate:
3201  {
3203  o << "kXR_truncate (";
3204  if( !sreq->dlen )
3205  o << "handle: " << FileHandleToStr( sreq->fhandle );
3206  else
3207  {
3208  char *fn = GetDataAsString( msg );
3209  o << "file: " << fn;
3210  delete [] fn;
3211  }
3212  o << std::setbase(10);
3213  o << ", ";
3214  o << "offset: " << sreq->offset;
3215  o << ")";
3216  break;
3217  }
3218 
3219  //------------------------------------------------------------------------
3220  // kXR_readv
3221  //------------------------------------------------------------------------
3222  case kXR_readv:
3223  {
3224  unsigned char *fhandle = 0;
3225  o << "kXR_readv (";
3226 
3227  o << "handle: ";
3228  readahead_list *dataChunk = (readahead_list*)(msg + 24 );
3229  fhandle = dataChunk[0].fhandle;
3230  if( fhandle )
3231  o << FileHandleToStr( fhandle );
3232  else
3233  o << "unknown";
3234  o << ", ";
3235  o << std::setbase(10);
3236  o << "chunks: [";
3237  uint64_t size = 0;
3238  for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3239  {
3240  size += dataChunk[i].rlen;
3241  o << "(offset: " << dataChunk[i].offset;
3242  o << ", size: " << dataChunk[i].rlen << "); ";
3243  }
3244  o << "], ";
3245  o << "total size: " << size << ")";
3246  break;
3247  }
3248 
3249  //------------------------------------------------------------------------
3250  // kXR_writev
3251  //------------------------------------------------------------------------
3252  case kXR_writev:
3253  {
3254  unsigned char *fhandle = 0;
3255  o << "kXR_writev (";
3256 
3257  XrdProto::write_list *wrtList =
3258  reinterpret_cast<XrdProto::write_list*>( msg + 24 );
3259  uint64_t size = 0;
3260  uint32_t numChunks = 0;
3261  for( size_t i = 0; i < req->dlen/sizeof(XrdProto::write_list); ++i )
3262  {
3263  fhandle = wrtList[i].fhandle;
3264  size += wrtList[i].wlen;
3265  ++numChunks;
3266  }
3267  o << "handle: ";
3268  if( fhandle )
3269  o << FileHandleToStr( fhandle );
3270  else
3271  o << "unknown";
3272  o << ", ";
3273  o << std::setbase(10);
3274  o << "chunks: " << numChunks << ", ";
3275  o << "total size: " << size << ")";
3276  break;
3277  }
3278 
3279  //------------------------------------------------------------------------
3280  // kXR_locate
3281  //------------------------------------------------------------------------
3282  case kXR_locate:
3283  {
3285  char *fn = GetDataAsString( msg );;
3286  o << "kXR_locate (";
3287  o << "path: " << fn << ", ";
3288  delete [] fn;
3289  o << "flags: ";
3290  if( sreq->options == 0 )
3291  o << "none";
3292  else
3293  {
3294  if( sreq->options & kXR_refresh )
3295  o << "kXR_refresh ";
3296  if( sreq->options & kXR_prefname )
3297  o << "kXR_prefname ";
3298  if( sreq->options & kXR_nowait )
3299  o << "kXR_nowait ";
3300  if( sreq->options & kXR_force )
3301  o << "kXR_force ";
3302  if( sreq->options & kXR_compress )
3303  o << "kXR_compress ";
3304  }
3305  o << ")";
3306  break;
3307  }
3308 
3309  //------------------------------------------------------------------------
3310  // kXR_mv
3311  //------------------------------------------------------------------------
3312  case kXR_mv:
3313  {
3314  ClientMvRequest *sreq = (ClientMvRequest *)msg;
3315  o << "kXR_mv (";
3316  o << "source: ";
3317  o.write( msg + sizeof( ClientMvRequest ), sreq->arg1len );
3318  o << ", ";
3319  o << "destination: ";
3320  o.write( msg + sizeof( ClientMvRequest ) + sreq->arg1len + 1, sreq->dlen - sreq->arg1len - 1 );
3321  o << ")";
3322  break;
3323  }
3324 
3325  //------------------------------------------------------------------------
3326  // kXR_query
3327  //------------------------------------------------------------------------
3328  case kXR_query:
3329  {
3330  ClientQueryRequest *sreq = (ClientQueryRequest *)msg;
3331  o << "kXR_query (";
3332  o << "code: ";
3333  switch( sreq->infotype )
3334  {
3335  case kXR_Qconfig: o << "kXR_Qconfig"; break;
3336  case kXR_Qckscan: o << "kXR_Qckscan"; break;
3337  case kXR_Qcksum: o << "kXR_Qcksum"; break;
3338  case kXR_Qopaque: o << "kXR_Qopaque"; break;
3339  case kXR_Qopaquf: o << "kXR_Qopaquf"; break;
3340  case kXR_Qopaqug: o << "kXR_Qopaqug"; break;
3341  case kXR_QPrep: o << "kXR_QPrep"; break;
3342  case kXR_Qspace: o << "kXR_Qspace"; break;
3343  case kXR_QStats: o << "kXR_QStats"; break;
3344  case kXR_Qvisa: o << "kXR_Qvisa"; break;
3345  case kXR_Qxattr: o << "kXR_Qxattr"; break;
3346  default: o << sreq->infotype; break;
3347  }
3348  o << ", ";
3349 
3350  if( sreq->infotype == kXR_Qopaqug || sreq->infotype == kXR_Qvisa )
3351  {
3352  o << "handle: " << FileHandleToStr( sreq->fhandle );
3353  o << ", ";
3354  }
3355 
3356  o << "arg length: " << sreq->dlen << ")";
3357  break;
3358  }
3359 
3360  //------------------------------------------------------------------------
3361  // kXR_rm
3362  //------------------------------------------------------------------------
3363  case kXR_rm:
3364  {
3365  o << "kXR_rm (";
3366  char *fn = GetDataAsString( msg );;
3367  o << "path: " << fn << ")";
3368  delete [] fn;
3369  break;
3370  }
3371 
3372  //------------------------------------------------------------------------
3373  // kXR_mkdir
3374  //------------------------------------------------------------------------
3375  case kXR_mkdir:
3376  {
3377  ClientMkdirRequest *sreq = (ClientMkdirRequest *)msg;
3378  o << "kXR_mkdir (";
3379  char *fn = GetDataAsString( msg );
3380  o << "path: " << fn << ", ";
3381  delete [] fn;
3382  o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3383  o << std::setbase(10);
3384  o << "flags: ";
3385  if( sreq->options[0] == 0 )
3386  o << "none";
3387  else
3388  {
3389  if( sreq->options[0] & kXR_mkdirpath )
3390  o << "kXR_mkdirpath";
3391  }
3392  o << ")";
3393  break;
3394  }
3395 
3396  //------------------------------------------------------------------------
3397  // kXR_rmdir
3398  //------------------------------------------------------------------------
3399  case kXR_rmdir:
3400  {
3401  o << "kXR_rmdir (";
3402  char *fn = GetDataAsString( msg );
3403  o << "path: " << fn << ")";
3404  delete [] fn;
3405  break;
3406  }
3407 
3408  //------------------------------------------------------------------------
3409  // kXR_chmod
3410  //------------------------------------------------------------------------
3411  case kXR_chmod:
3412  {
3413  ClientChmodRequest *sreq = (ClientChmodRequest *)msg;
3414  o << "kXR_chmod (";
3415  char *fn = GetDataAsString( msg );
3416  o << "path: " << fn << ", ";
3417  delete [] fn;
3418  o << "mode: 0" << std::setbase(8) << sreq->mode << ")";
3419  break;
3420  }
3421 
3422  //------------------------------------------------------------------------
3423  // kXR_ping
3424  //------------------------------------------------------------------------
3425  case kXR_ping:
3426  {
3427  o << "kXR_ping ()";
3428  break;
3429  }
3430 
3431  //------------------------------------------------------------------------
3432  // kXR_protocol
3433  //------------------------------------------------------------------------
3434  case kXR_protocol:
3435  {
3437  o << "kXR_protocol (";
3438  o << "clientpv: 0x" << std::setbase(16) << sreq->clientpv << ")";
3439  break;
3440  }
3441 
3442  //------------------------------------------------------------------------
3443  // kXR_dirlist
3444  //------------------------------------------------------------------------
3445  case kXR_dirlist:
3446  {
3447  o << "kXR_dirlist (";
3448  char *fn = GetDataAsString( msg );;
3449  o << "path: " << fn << ")";
3450  delete [] fn;
3451  break;
3452  }
3453 
3454  //------------------------------------------------------------------------
3455  // kXR_set
3456  //------------------------------------------------------------------------
3457  case kXR_set:
3458  {
3459  o << "kXR_set (";
3460  char *fn = GetDataAsString( msg );;
3461  o << "data: " << fn << ")";
3462  delete [] fn;
3463  break;
3464  }
3465 
3466  //------------------------------------------------------------------------
3467  // kXR_prepare
3468  //------------------------------------------------------------------------
3469  case kXR_prepare:
3470  {
3472  o << "kXR_prepare (";
3473  o << "flags: ";
3474 
3475  if( sreq->options == 0 )
3476  o << "none";
3477  else
3478  {
3479  if( sreq->options & kXR_stage )
3480  o << "kXR_stage ";
3481  if( sreq->options & kXR_wmode )
3482  o << "kXR_wmode ";
3483  if( sreq->options & kXR_coloc )
3484  o << "kXR_coloc ";
3485  if( sreq->options & kXR_fresh )
3486  o << "kXR_fresh ";
3487  }
3488 
3489  o << ", priority: " << (int) sreq->prty << ", ";
3490 
3491  char *fn = GetDataAsString( msg );
3492  char *cursor;
3493  for( cursor = fn; *cursor; ++cursor )
3494  if( *cursor == '\n' ) *cursor = ' ';
3495 
3496  o << "paths: " << fn << ")";
3497  delete [] fn;
3498  break;
3499  }
3500 
3501  case kXR_chkpoint:
3502  {
3504  o << "kXR_chkpoint (";
3505  o << "opcode: ";
3506  if( sreq->opcode == kXR_ckpBegin ) o << "kXR_ckpBegin)";
3507  else if( sreq->opcode == kXR_ckpCommit ) o << "kXR_ckpCommit)";
3508  else if( sreq->opcode == kXR_ckpQuery ) o << "kXR_ckpQuery)";
3509  else if( sreq->opcode == kXR_ckpRollback ) o << "kXR_ckpRollback)";
3510  else if( sreq->opcode == kXR_ckpXeq )
3511  {
3512  o << "kXR_ckpXeq) ";
3513  // In this case our request body will be one of kXR_pgwrite,
3514  // kXR_truncate, kXR_write, or kXR_writev request.
3515  GenerateDescription( msg + sizeof( ClientChkPointRequest ), o );
3516  }
3517 
3518  break;
3519  }
3520 
3521  //------------------------------------------------------------------------
3522  // Default
3523  //------------------------------------------------------------------------
3524  default:
3525  {
3526  o << "kXR_unknown (length: " << req->dlen << ")";
3527  break;
3528  }
3529  };
3530  }
static const int kXR_ckpRollback
Definition: XProtocol.hh:215
kXR_int16 arg1len
Definition: XProtocol.hh:430
@ kXR_fattrDel
Definition: XProtocol.hh:270
@ kXR_fattrSet
Definition: XProtocol.hh:273
@ kXR_fattrList
Definition: XProtocol.hh:272
@ kXR_fattrGet
Definition: XProtocol.hh:271
kXR_char fhandle[4]
Definition: XProtocol.hh:531
kXR_char fhandle[4]
Definition: XProtocol.hh:782
kXR_char fhandle[4]
Definition: XProtocol.hh:807
kXR_char fhandle[4]
Definition: XProtocol.hh:771
kXR_int32 dlen
Definition: XProtocol.hh:431
kXR_int64 offset
Definition: XProtocol.hh:646
kXR_unt16 options
Definition: XProtocol.hh:481
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
@ kXR_open_wrto
Definition: XProtocol.hh:469
@ kXR_compress
Definition: XProtocol.hh:452
@ kXR_async
Definition: XProtocol.hh:458
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_prefname
Definition: XProtocol.hh:461
@ kXR_nowait
Definition: XProtocol.hh:467
@ kXR_open_read
Definition: XProtocol.hh:456
@ kXR_open_updt
Definition: XProtocol.hh:457
@ kXR_mkpath
Definition: XProtocol.hh:460
@ kXR_seqio
Definition: XProtocol.hh:468
@ kXR_replica
Definition: XProtocol.hh:465
@ kXR_posc
Definition: XProtocol.hh:466
@ kXR_refresh
Definition: XProtocol.hh:459
@ kXR_new
Definition: XProtocol.hh:455
@ kXR_force
Definition: XProtocol.hh:454
@ kXR_4dirlist
Definition: XProtocol.hh:464
@ kXR_open_apnd
Definition: XProtocol.hh:462
@ kXR_retstat
Definition: XProtocol.hh:463
kXR_char fhandle[4]
Definition: XProtocol.hh:509
kXR_unt16 infotype
Definition: XProtocol.hh:631
kXR_char fhandle[4]
Definition: XProtocol.hh:645
kXR_char fhandle[4]
Definition: XProtocol.hh:659
kXR_char fhandle[4]
Definition: XProtocol.hh:229
kXR_unt16 requestid
Definition: XProtocol.hh:157
kXR_char fhandle[4]
Definition: XProtocol.hh:633
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_mkdir
Definition: XProtocol.hh:120
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_chmod
Definition: XProtocol.hh:114
@ kXR_dirlist
Definition: XProtocol.hh:116
@ kXR_fattr
Definition: XProtocol.hh:132
@ kXR_rm
Definition: XProtocol.hh:126
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_set
Definition: XProtocol.hh:130
@ kXR_rmdir
Definition: XProtocol.hh:127
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_protocol
Definition: XProtocol.hh:118
@ kXR_mv
Definition: XProtocol.hh:121
@ kXR_ping
Definition: XProtocol.hh:123
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_locate
Definition: XProtocol.hh:139
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_pgwrite
Definition: XProtocol.hh:138
@ kXR_prepare
Definition: XProtocol.hh:133
kXR_int32 rlen
Definition: XProtocol.hh:660
kXR_char fhandle[4]
Definition: XProtocol.hh:794
kXR_unt16 mode
Definition: XProtocol.hh:480
kXR_char options[1]
Definition: XProtocol.hh:416
static const int kXR_ckpCommit
Definition: XProtocol.hh:213
kXR_int64 offset
Definition: XProtocol.hh:661
@ kXR_vfs
Definition: XProtocol.hh:763
@ kXR_mkdirpath
Definition: XProtocol.hh:410
@ kXR_wmode
Definition: XProtocol.hh:591
@ kXR_fresh
Definition: XProtocol.hh:593
@ kXR_coloc
Definition: XProtocol.hh:592
@ kXR_stage
Definition: XProtocol.hh:590
static const int kXR_ckpQuery
Definition: XProtocol.hh:214
kXR_int64 offset
Definition: XProtocol.hh:808
kXR_int32 dlen
Definition: XProtocol.hh:772
kXR_char options
Definition: XProtocol.hh:769
kXR_int32 rlen
Definition: XProtocol.hh:647
@ kXR_QPrep
Definition: XProtocol.hh:616
@ kXR_Qopaqug
Definition: XProtocol.hh:625
@ kXR_Qconfig
Definition: XProtocol.hh:621
@ kXR_Qopaquf
Definition: XProtocol.hh:624
@ kXR_Qckscan
Definition: XProtocol.hh:620
@ kXR_Qxattr
Definition: XProtocol.hh:618
@ kXR_Qspace
Definition: XProtocol.hh:619
@ kXR_Qvisa
Definition: XProtocol.hh:622
@ kXR_QStats
Definition: XProtocol.hh:615
@ kXR_Qcksum
Definition: XProtocol.hh:617
@ kXR_Qopaque
Definition: XProtocol.hh:623
kXR_int32 dlen
Definition: XProtocol.hh:159
static const int kXR_ckpBegin
Definition: XProtocol.hh:212
@ ErrorMsg
report errors
Definition: XrdClLog.hh:109
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
XrdSysError Log
Definition: XrdConfig.cc:113
kXR_char fhandle[4]
Definition: XProtocol.hh:832
kXR_char fhandle[4]
Definition: XProtocol.hh:288

References ClientMvRequest::arg1len, ClientProtocolRequest::clientpv, ClientRequestHdr::dlen, ClientMvRequest::dlen, ClientPgWriteRequest::dlen, ClientQueryRequest::dlen, ClientStatRequest::dlen, ClientTruncateRequest::dlen, ClientWriteRequest::dlen, XrdCl::Log::ErrorMsg, ClientCloseRequest::fhandle, ClientFattrRequest::fhandle, ClientPgReadRequest::fhandle, ClientPgWriteRequest::fhandle, ClientQueryRequest::fhandle, ClientReadRequest::fhandle, readahead_list::fhandle, ClientStatRequest::fhandle, ClientSyncRequest::fhandle, ClientTruncateRequest::fhandle, ClientWriteRequest::fhandle, XrdProto::write_list::fhandle, XrdCl::Log::GetLevel(), XrdCl::DefaultEnv::GetLog(), ClientQueryRequest::infotype, kXR_4dirlist, kXR_async, kXR_chkpoint, kXR_chmod, kXR_ckpBegin, kXR_ckpCommit, kXR_ckpQuery, kXR_ckpRollback, kXR_ckpXeq, kXR_close, kXR_coloc, kXR_compress, kXR_delete, kXR_dirlist, kXR_fattr, kXR_fattrDel, kXR_fattrGet, kXR_fattrList, kXR_fattrSet, kXR_force, kXR_fresh, kXR_locate, kXR_mkdir, kXR_mkdirpath, kXR_mkpath, kXR_mv, kXR_new, kXR_nowait, kXR_open, kXR_open_apnd, kXR_open_read, kXR_open_updt, kXR_open_wrto, kXR_pgread, kXR_pgwrite, kXR_ping, kXR_posc, kXR_prefname, kXR_prepare, kXR_protocol, kXR_Qckscan, kXR_Qcksum, kXR_Qconfig, kXR_Qopaque, kXR_Qopaquf, kXR_Qopaqug, kXR_QPrep, kXR_Qspace, kXR_QStats, kXR_query, kXR_Qvisa, kXR_Qxattr, kXR_read, kXR_readv, kXR_refresh, kXR_replica, kXR_retstat, kXR_rm, kXR_rmdir, kXR_seqio, kXR_set, kXR_stage, kXR_stat, kXR_sync, kXR_truncate, kXR_vfs, kXR_wmode, kXR_write, kXR_writev, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientFattrRequest::numattr, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, readahead_list::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, ClientChkPointRequest::opcode, ClientFattrRequest::options, ClientLocateRequest::options, ClientMkdirRequest::options, ClientOpenRequest::options, ClientPrepareRequest::options, ClientStatRequest::options, ClientPrepareRequest::prty, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, ClientFattrRequest::subcode, and XrdProto::write_list::wlen.

Referenced by SetDescription().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetBindPreference()

URL XrdCl::XRootDTransport::GetBindPreference ( const URL url,
AnyObject channelData 
)
virtual

Get bind preference for the next data stream.

Implements XrdCl::TransportHandler.

Definition at line 1911 of file XrdClXRootDTransport.cc.

1913  {
1914  XRootDChannelInfo *info = 0;
1915  channelData.Get( info );
1916 
1917  if(!info || !info->bindSelector)
1918  return url;
1919 
1920  return URL( info->bindSelector->Get() );
1921  }

References XrdCl::XRootDChannelInfo::bindSelector, and XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ GetBody()

XRootDStatus XrdCl::XRootDTransport::GetBody ( Message message,
Socket socket 
)
virtual

Read the message body from the socket, the socket is non-blocking, the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 347 of file XrdClXRootDTransport.cc.

348  {
349  //--------------------------------------------------------------------------
350  // Retrieve the body
351  //--------------------------------------------------------------------------
352  size_t leftToBeRead = 0;
353  uint32_t bodySize = 0;
354  ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
355  bodySize = rsphdr->dlen;
356 
357  if( message.GetSize() < bodySize + 8 )
358  message.ReAllocate( bodySize + 8 );
359 
360  leftToBeRead = bodySize-(message.GetCursor()-8);
361  while( leftToBeRead )
362  {
363  int bytesRead = 0;
364  XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
365 
366  if( !status.IsOK() || status.code == suRetry )
367  return status;
368 
369  leftToBeRead -= bytesRead;
370  message.AdvanceCursor( bytesRead );
371  }
372 
373  return XRootDStatus( stOK, suDone );
374  }
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t suDone
Definition: XrdClStatus.hh:38

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Status::code, ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), XrdCl::stOK, XrdCl::suDone, and XrdCl::suRetry.

+ Here is the call graph for this function:

◆ GetHeader()

XRootDStatus XrdCl::XRootDTransport::GetHeader ( Message message,
Socket socket 
)
virtual

Read a message header from the socket, the socket is non-blocking, so if there is not enough data the function should return suRetry in which case it will be called again when more data arrives, with the data previously read stored in the message buffer

Parameters
messagethe message buffer
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 307 of file XrdClXRootDTransport.cc.

308  {
309  //--------------------------------------------------------------------------
310  // A new message - allocate the space needed for the header
311  //--------------------------------------------------------------------------
312  if( message.GetCursor() == 0 && message.GetSize() < 8 )
313  message.Allocate( 8 );
314 
315  //--------------------------------------------------------------------------
316  // Read the message header
317  //--------------------------------------------------------------------------
318  if( message.GetCursor() < 8 )
319  {
320  size_t leftToBeRead = 8 - message.GetCursor();
321  while( leftToBeRead )
322  {
323  int bytesRead = 0;
324  XRootDStatus status = socket->Read( message.GetBufferAtCursor(),
325  leftToBeRead, bytesRead );
326  if( !status.IsOK() || status.code == suRetry )
327  return status;
328 
329  leftToBeRead -= bytesRead;
330  message.AdvanceCursor( bytesRead );
331  }
332  UnMarshallHeader( message );
333 
334  uint32_t bodySize = *(uint32_t*)(message.GetBuffer(4));
335  Log *log = DefaultEnv::GetLog();
336  log->Dump( XRootDTransportMsg, "[msg: %p] Expecting %d bytes of message "
337  "body", &message, bodySize );
338 
339  return XRootDStatus( stOK, suDone );
340  }
341  return XRootDStatus( stError, errInternal );
342  }
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Buffer::Allocate(), XrdCl::Status::code, XrdCl::Log::Dump(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarshallHeader(), and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ GetMore()

XRootDStatus XrdCl::XRootDTransport::GetMore ( Message message,
Socket socket 
)
virtual

Read more of the message body from the socket, the socket is non-blocking the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 379 of file XrdClXRootDTransport.cc.

380  {
381  ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
382  if( rsphdr->status != kXR_status )
383  return XRootDStatus( stError, errInvalidOp );
384 
385  //--------------------------------------------------------------------------
386  // In case of non kXR_status responses we read all the response, including
387  // data. For kXR_status responses we first read only the remainder of the
388  // header. The header must then be unmarshalled, and then a second call to
389  // GetMore (repeated for suRetry as needed) will read the data.
390  //--------------------------------------------------------------------------
391 
392  uint32_t bodySize = rsphdr->dlen;
393  if( bodySize+8 < sizeof( ServerResponseStatus ) )
394  return XRootDStatus( stError, errInvalidMessage, 0,
395  "kXR_status: invalid message size." );
396 
397  ServerResponseStatus *rspst = (ServerResponseStatus*)message.GetBuffer();
398  bodySize += rspst->bdy.dlen;
399 
400  if( message.GetSize() < bodySize + 8 )
401  message.ReAllocate( bodySize + 8 );
402 
403  size_t leftToBeRead = bodySize-(message.GetCursor()-8);
404  while( leftToBeRead )
405  {
406  int bytesRead = 0;
407  XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
408 
409  if( !status.IsOK() || status.code == suRetry )
410  return status;
411 
412  leftToBeRead -= bytesRead;
413  message.AdvanceCursor( bytesRead );
414  }
415 
416  // Unmarchal to message body
417  Log *log = DefaultEnv::GetLog();
418  XRootDStatus st = XRootDTransport::UnMarchalStatusMore( message );
419  if( !st.IsOK() && st.code == errDataError )
420  {
421  log->Error( XRootDTransportMsg, "[msg: %p] %s", &message,
422  st.GetErrorMessage().c_str() );
423  return st;
424  }
425 
426  if( !st.IsOK() )
427  {
428  log->Error( XRootDTransportMsg, "[msg: %p] Failed to unmarshall status body.",
429  &message );
430  return st;
431  }
432 
433  return XRootDStatus( stOK, suDone );
434  }
@ kXR_status
Definition: XProtocol.hh:907
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1262
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85

References XrdCl::Buffer::AdvanceCursor(), ServerResponseStatus::bdy, XrdCl::Status::code, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errInvalidOp, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), kXR_status, XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarchalStatusMore(), and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ GetSignature() [1/2]

Status XrdCl::XRootDTransport::GetSignature ( Message toSign,
Message *&  sign,
AnyObject channelData 
)
virtual

Get signature for given message.

Implements XrdCl::TransportHandler.

Definition at line 1772 of file XrdClXRootDTransport.cc.

1773  {
1774  XRootDChannelInfo *info = 0;
1775  channelData.Get( info );
1776  return GetSignature( toSign, sign, info );
1777  }
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.

References XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ GetSignature() [2/2]

Status XrdCl::XRootDTransport::GetSignature ( Message toSign,
Message *&  sign,
XRootDChannelInfo info 
)
virtual

Get signature for given message.

Definition at line 1782 of file XrdClXRootDTransport.cc.

1785  {
1786  XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
1787  if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
1788 
1789  ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
1790  if( !info ) return Status( stError, errInternal );
1791  if( info->protection )
1792  {
1793  SecurityRequest *newreq = 0;
1794  // check if we have to secure the request in the first place
1795  if( !( NEED2SECURE ( info->protection )( *thereq ) ) ) return Status();
1796  // secure (sign/encrypt) the request
1797  int rc = info->protection->Secure( newreq, *thereq, 0 );
1798  // there was an error
1799  if( rc < 0 )
1800  return Status( stError, errInternal, -rc );
1801 
1802  sign = new Message();
1803  sign->Grab( reinterpret_cast<char*>( newreq ), rc );
1804  }
1805 
1806  return Status();
1807  }
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.

References XrdCl::errInternal, XrdCl::errInvalidOp, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::Grab(), XrdCl::PluginUnloadHandler::lock, NEED2SECURE, XrdCl::XRootDChannelInfo::protection, XrdSecProtect::Secure(), XrdCl::stError, and XrdCl::PluginUnloadHandler::unloaded.

+ Here is the call graph for this function:

◆ HandShake()

XRootDStatus XrdCl::XRootDTransport::HandShake ( HandShakeData handShakeData,
AnyObject channelData 
)
virtual

HandShake.

Implements XrdCl::TransportHandler.

Definition at line 467 of file XrdClXRootDTransport.cc.

469  {
470  XRootDChannelInfo *info = 0;
471  channelData.Get( info );
472 
473  if (!info)
474  return XRootDStatus(stFatal, errInternal);
475 
476  XrdSysMutexHelper scopedLock( info->mutex );
477 
478  if( info->stream.size() <= handShakeData->subStreamId )
479  {
480  Log *log = DefaultEnv::GetLog();
481  log->Error( XRootDTransportMsg,
482  "[%s] Internal error: not enough substreams",
483  handShakeData->streamName.c_str() );
484  return XRootDStatus( stFatal, errInternal );
485  }
486 
487  if( handShakeData->subStreamId == 0 )
488  {
489  info->streamName = handShakeData->streamName;
490  return HandShakeMain( handShakeData, channelData );
491  }
492  return HandShakeParallel( handShakeData, channelData );
493  }
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33

References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::stFatal, XrdCl::XRootDChannelInfo::stream, XrdCl::HandShakeData::streamName, XrdCl::XRootDChannelInfo::streamName, XrdCl::HandShakeData::subStreamId, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ HandShakeDone()

bool XrdCl::XRootDTransport::HandShakeDone ( HandShakeData handShakeData,
AnyObject channelData 
)
virtual

Implements XrdCl::TransportHandler.

Definition at line 746 of file XrdClXRootDTransport.cc.

748  {
749  XRootDChannelInfo *info = 0;
750  channelData.Get( info );
751 
752  if (!info) {
754  "[%s] Internal error: no channel info",
755  handShakeData->streamName.c_str());
756  return false;
757  }
758 
759  XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
760  return ( sInfo.status == XRootDStreamInfo::Connected );
761  }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, XrdCl::HandShakeData::streamName, XrdCl::HandShakeData::subStreamId, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ InitializeChannel()

void XrdCl::XRootDTransport::InitializeChannel ( const URL url,
AnyObject channelData 
)
virtual

Initialize channel.

Implements XrdCl::TransportHandler.

Definition at line 439 of file XrdClXRootDTransport.cc.

441  {
442  XRootDChannelInfo *info = new XRootDChannelInfo( url );
443  XrdSysMutexHelper scopedLock( info->mutex );
444  channelData.Set( info );
445 
446  Env *env = DefaultEnv::GetEnv();
447  int streams = DefaultSubStreamsPerChannel;
448  env->GetInt( "SubStreamsPerChannel", streams );
449  if( streams < 1 ) streams = 1;
450  info->stream.resize( streams );
451  info->strmSelector.reset( new StreamSelector( streams ) );
452  info->encrypted = url.IsSecure();
453  info->istpc = url.IsTPC();
454  info->logintoken = url.GetLoginToken();
455  }
static Env * GetEnv()
Get default client environment.
const int DefaultSubStreamsPerChannel

References XrdCl::DefaultSubStreamsPerChannel, XrdCl::XRootDChannelInfo::encrypted, XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::URL::GetLoginToken(), XrdCl::URL::IsSecure(), XrdCl::URL::IsTPC(), XrdCl::XRootDChannelInfo::istpc, XrdCl::XRootDChannelInfo::logintoken, XrdCl::XRootDChannelInfo::mutex, XrdCl::AnyObject::Set(), XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::strmSelector.

+ Here is the call graph for this function:

◆ IsStreamBroken()

Status XrdCl::XRootDTransport::IsStreamBroken ( time_t  inactiveTime,
AnyObject channelData 
)
virtual

Check the stream is broken - ie. TCP connection got broken and went undetected by the TCP stack

Implements XrdCl::TransportHandler.

Definition at line 819 of file XrdClXRootDTransport.cc.

821  {
822  XRootDChannelInfo *info = 0;
823  channelData.Get( info );
824  Env *env = DefaultEnv::GetEnv();
825  Log *log = DefaultEnv::GetLog();
826 
827  if (!info) {
828  log->Error(XRootDTransportMsg,
829  "Internal error: no channel info, behaving as if stream is broken");
830  return true;
831  }
832 
833  int streamTimeout = DefaultStreamTimeout;
834  env->GetInt( "StreamTimeout", streamTimeout );
835 
836  XrdSysMutexHelper scopedLock( info->mutex );
837 
838  const time_t now = time(0);
839  const bool anySID =
840  info->sidManager->IsAnySIDOldAs( now - streamTimeout );
841 
842  log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
843  "stream timeout: %d, any SID: %d, wait barrier: %s",
844  info->streamName.c_str(), (long long) inactiveTime, streamTimeout,
845  anySID, Utils::TimeToString(info->waitBarrier).c_str() );
846 
847  if( inactiveTime < streamTimeout )
848  return Status();
849 
850  if( now < info->waitBarrier )
851  return Status();
852 
853  if( !anySID )
854  return Status();
855 
856  return Status( stError, errSocketTimeout );
857  }
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
Definition: XrdClUtils.cc:256
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
const int DefaultStreamTimeout

References XrdCl::DefaultStreamTimeout, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::errSocketTimeout, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::sidManager, XrdCl::stError, XrdCl::XRootDChannelInfo::streamName, XrdCl::Utils::TimeToString(), XrdCl::XRootDChannelInfo::waitBarrier, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ IsStreamTTLElapsed()

bool XrdCl::XRootDTransport::IsStreamTTLElapsed ( time_t  time,
AnyObject channelData 
)
virtual

Check if the stream should be disconnected.

Implements XrdCl::TransportHandler.

Definition at line 766 of file XrdClXRootDTransport.cc.

768  {
769  XRootDChannelInfo *info = 0;
770  channelData.Get( info );
771 
772  Env *env = DefaultEnv::GetEnv();
773  Log *log = DefaultEnv::GetLog();
774 
775  if (!info) {
776  log->Error(XRootDTransportMsg,
777  "Internal error: no channel info, behaving as if TTL has elapsed");
778  return true;
779  }
780 
781  //--------------------------------------------------------------------------
782  // Check the TTL settings for the current server
783  //--------------------------------------------------------------------------
784  int ttl;
785  if( info->serverFlags & kXR_isServer )
786  {
787  ttl = DefaultDataServerTTL;
788  env->GetInt( "DataServerTTL", ttl );
789  }
790  else
791  {
793  env->GetInt( "LoadBalancerTTL", ttl );
794  }
795 
796  //--------------------------------------------------------------------------
797  // See whether we can give a go-ahead for the disconnection
798  //--------------------------------------------------------------------------
799  XrdSysMutexHelper scopedLock( info->mutex );
800  uint16_t allocatedSIDs = info->sidManager->GetNumberOfAllocatedSIDs();
801  log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
802  "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
803  info->streamName.c_str(), (long long) inactiveTime, ttl, allocatedSIDs,
804  info->openFiles, info->finstcnt.load( std::memory_order_relaxed ) );
805 
806  if( info->openFiles != 0 && info->finstcnt.load( std::memory_order_relaxed ) != 0 )
807  return false;
808 
809  if( !allocatedSIDs && inactiveTime > ttl )
810  return true;
811 
812  return false;
813  }
#define kXR_isServer
Definition: XProtocol.hh:1157
const int DefaultLoadBalancerTTL
const int DefaultDataServerTTL

References XrdCl::DefaultDataServerTTL, XrdCl::DefaultLoadBalancerTTL, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), kXR_isServer, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDChannelInfo::streamName, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ LogErrorResponse()

void XrdCl::XRootDTransport::LogErrorResponse ( const Message msg)
static

Log server error response.

Definition at line 1507 of file XrdClXRootDTransport.cc.

1508  {
1509  Log *log = DefaultEnv::GetLog();
1510  ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
1511  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
1512  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
1513  log->Error( XRootDTransportMsg, "Server responded with an error [%d]: %s",
1514  rsp->body.error.errnum, errmsg );
1515  delete [] errmsg;
1516  }
union ServerResponse::@0 body
ServerResponseHeader hdr
Definition: XProtocol.hh:1288

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ MarshallRequest() [1/2]

XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( char *  msg)
static

Marshal the outgoing message.

Definition at line 1103 of file XrdClXRootDTransport.cc.

1104  {
1105  ClientRequest *req = (ClientRequest*)msg;
1106  switch( req->header.requestid )
1107  {
1108  //------------------------------------------------------------------------
1109  // kXR_protocol
1110  //------------------------------------------------------------------------
1111  case kXR_protocol:
1112  req->protocol.clientpv = htonl( req->protocol.clientpv );
1113  break;
1114 
1115  //------------------------------------------------------------------------
1116  // kXR_login
1117  //------------------------------------------------------------------------
1118  case kXR_login:
1119  req->login.pid = htonl( req->login.pid );
1120  break;
1121 
1122  //------------------------------------------------------------------------
1123  // kXR_locate
1124  //------------------------------------------------------------------------
1125  case kXR_locate:
1126  req->locate.options = htons( req->locate.options );
1127  break;
1128 
1129  //------------------------------------------------------------------------
1130  // kXR_query
1131  //------------------------------------------------------------------------
1132  case kXR_query:
1133  req->query.infotype = htons( req->query.infotype );
1134  break;
1135 
1136  //------------------------------------------------------------------------
1137  // kXR_truncate
1138  //------------------------------------------------------------------------
1139  case kXR_truncate:
1140  req->truncate.offset = htonll( req->truncate.offset );
1141  break;
1142 
1143  //------------------------------------------------------------------------
1144  // kXR_mkdir
1145  //------------------------------------------------------------------------
1146  case kXR_mkdir:
1147  req->mkdir.mode = htons( req->mkdir.mode );
1148  break;
1149 
1150  //------------------------------------------------------------------------
1151  // kXR_chmod
1152  //------------------------------------------------------------------------
1153  case kXR_chmod:
1154  req->chmod.mode = htons( req->chmod.mode );
1155  break;
1156 
1157  //------------------------------------------------------------------------
1158  // kXR_open
1159  //------------------------------------------------------------------------
1160  case kXR_open:
1161  req->open.mode = htons( req->open.mode );
1162  req->open.options = htons( req->open.options );
1163  break;
1164 
1165  //------------------------------------------------------------------------
1166  // kXR_read
1167  //------------------------------------------------------------------------
1168  case kXR_read:
1169  req->read.offset = htonll( req->read.offset );
1170  req->read.rlen = htonl( req->read.rlen );
1171  break;
1172 
1173  //------------------------------------------------------------------------
1174  // kXR_write
1175  //------------------------------------------------------------------------
1176  case kXR_write:
1177  req->write.offset = htonll( req->write.offset );
1178  break;
1179 
1180  //------------------------------------------------------------------------
1181  // kXR_mv
1182  //------------------------------------------------------------------------
1183  case kXR_mv:
1184  req->mv.arg1len = htons( req->mv.arg1len );
1185  break;
1186 
1187  //------------------------------------------------------------------------
1188  // kXR_readv
1189  //------------------------------------------------------------------------
1190  case kXR_readv:
1191  {
1192  uint16_t numChunks = (req->readv.dlen)/16;
1193  readahead_list *dataChunk = (readahead_list*)( msg + 24 );
1194  for( size_t i = 0; i < numChunks; ++i )
1195  {
1196  dataChunk[i].rlen = htonl( dataChunk[i].rlen );
1197  dataChunk[i].offset = htonll( dataChunk[i].offset );
1198  }
1199  break;
1200  }
1201 
1202  //------------------------------------------------------------------------
1203  // kXR_writev
1204  //------------------------------------------------------------------------
1205  case kXR_writev:
1206  {
1207  uint16_t numChunks = (req->writev.dlen)/16;
1208  XrdProto::write_list *wrtList =
1209  reinterpret_cast<XrdProto::write_list*>( msg + 24 );
1210  for( size_t i = 0; i < numChunks; ++i )
1211  {
1212  wrtList[i].wlen = htonl( wrtList[i].wlen );
1213  wrtList[i].offset = htonll( wrtList[i].offset );
1214  }
1215 
1216  break;
1217  }
1218 
1219  case kXR_pgread:
1220  {
1221  req->pgread.offset = htonll( req->pgread.offset );
1222  req->pgread.rlen = htonl( req->pgread.rlen );
1223  break;
1224  }
1225 
1226  case kXR_pgwrite:
1227  {
1228  req->pgwrite.offset = htonll( req->pgwrite.offset );
1229  break;
1230  }
1231 
1232  //------------------------------------------------------------------------
1233  // kXR_prepare
1234  //------------------------------------------------------------------------
1235  case kXR_prepare:
1236  {
1237  req->prepare.optionX = htons( req->prepare.optionX );
1238  req->prepare.port = htons( req->prepare.port );
1239  break;
1240  }
1241 
1242  case kXR_chkpoint:
1243  {
1244  if( req->chkpoint.opcode == kXR_ckpXeq )
1245  MarshallRequest( msg + 24 );
1246  break;
1247  }
1248  };
1249 
1250  req->header.requestid = htons( req->header.requestid );
1251  req->header.dlen = htonl( req->header.dlen );
1252  return XRootDStatus();
1253  }
struct ClientTruncateRequest truncate
Definition: XProtocol.hh:875
struct ClientPgReadRequest pgread
Definition: XProtocol.hh:861
struct ClientMkdirRequest mkdir
Definition: XProtocol.hh:858
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:862
struct ClientReadVRequest readv
Definition: XProtocol.hh:868
struct ClientOpenRequest open
Definition: XProtocol.hh:860
struct ClientRequestHdr header
Definition: XProtocol.hh:846
struct ClientWriteVRequest writev
Definition: XProtocol.hh:877
struct ClientLoginRequest login
Definition: XProtocol.hh:857
@ kXR_login
Definition: XProtocol.hh:119
struct ClientChmodRequest chmod
Definition: XProtocol.hh:850
struct ClientQueryRequest query
Definition: XProtocol.hh:866
struct ClientReadRequest read
Definition: XProtocol.hh:867
struct ClientMvRequest mv
Definition: XProtocol.hh:859
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:849
struct ClientPrepareRequest prepare
Definition: XProtocol.hh:864
struct ClientWriteRequest write
Definition: XProtocol.hh:876
struct ClientProtocolRequest protocol
Definition: XProtocol.hh:865
struct ClientLocateRequest locate
Definition: XProtocol.hh:856
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.

References ClientMvRequest::arg1len, ClientRequest::chkpoint, ClientRequest::chmod, ClientProtocolRequest::clientpv, ClientRequestHdr::dlen, ClientReadVRequest::dlen, ClientWriteVRequest::dlen, ClientRequest::header, ClientQueryRequest::infotype, kXR_chkpoint, kXR_chmod, kXR_ckpXeq, kXR_locate, kXR_login, kXR_mkdir, kXR_mv, kXR_open, kXR_pgread, kXR_pgwrite, kXR_prepare, kXR_protocol, kXR_query, kXR_read, kXR_readv, kXR_truncate, kXR_write, kXR_writev, ClientRequest::locate, ClientRequest::login, MarshallRequest(), ClientRequest::mkdir, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientRequest::mv, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, readahead_list::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, XrdProto::write_list::offset, ClientChkPointRequest::opcode, ClientRequest::open, ClientLocateRequest::options, ClientOpenRequest::options, ClientPrepareRequest::optionX, ClientRequest::pgread, ClientRequest::pgwrite, ClientLoginRequest::pid, ClientPrepareRequest::port, ClientRequest::prepare, ClientRequest::protocol, ClientRequest::query, ClientRequest::read, ClientRequest::readv, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, ClientRequest::truncate, XrdProto::write_list::wlen, ClientRequest::write, and ClientRequest::writev.

+ Here is the call graph for this function:

◆ MarshallRequest() [2/2]

static XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( Message msg)
inlinestatic

Marshal the outgoing message.

Definition at line 175 of file XrdClXRootDTransport.hh.

176  {
177  MarshallRequest( msg->GetBuffer() );
178  msg->SetIsMarshalled( true );
179  return XRootDStatus();
180  }

References XrdCl::Buffer::GetBuffer(), and XrdCl::Message::SetIsMarshalled().

Referenced by MarshallRequest(), MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), XrdCl::MessageUtils::SendMessage(), and UnMarshallRequest().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ MessageReceived()

uint32_t XrdCl::XRootDTransport::MessageReceived ( Message msg,
uint16_t  subStream,
AnyObject channelData 
)
virtual

Check if the message invokes a stream action.

Implements XrdCl::TransportHandler.

Definition at line 1630 of file XrdClXRootDTransport.cc.

1633  {
1634  XRootDChannelInfo *info = 0;
1635  channelData.Get( info );
1636  XrdSysMutexHelper scopedLock( info->mutex );
1637  Log *log = DefaultEnv::GetLog();
1638 
1639  //--------------------------------------------------------------------------
1640  // Update the substream queues
1641  //--------------------------------------------------------------------------
1642  info->strmSelector->MsgReceived( subStream );
1643 
1644  //--------------------------------------------------------------------------
1645  // Check whether this message is a response to a request that has
1646  // timed out, and if so, drop it
1647  //--------------------------------------------------------------------------
1648  ServerResponse *rsp = (ServerResponse*)msg.GetBuffer();
1649  if( rsp->hdr.status == kXR_attn )
1650  {
1651  return NoAction;
1652  }
1653 
1654  if( info->sidManager->IsTimedOut( rsp->hdr.streamid ) )
1655  {
1656  log->Error( XRootDTransportMsg, "Message %p, stream [%d, %d] is a "
1657  "response that we're no longer interested in (timed out)",
1658  &msg, rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
1659  //------------------------------------------------------------------------
1660  // If it is kXR_waitresp there will be another one,
1661  // so we don't release the sid yet
1662  //------------------------------------------------------------------------
1663  if( rsp->hdr.status != kXR_waitresp )
1664  info->sidManager->ReleaseTimedOut( rsp->hdr.streamid );
1665  //------------------------------------------------------------------------
1666  // If it is a successful response to an open request
1667  // that timed out, we need to send a close
1668  //------------------------------------------------------------------------
1669  uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1670  std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1671  if( sidIt != info->sentOpens.end() )
1672  {
1673  info->sentOpens.erase( sidIt );
1674  if( rsp->hdr.status == kXR_ok ) return RequestClose;
1675  }
1676  return DigestMsg;
1677  }
1678 
1679  //--------------------------------------------------------------------------
1680  // If we have a wait or waitresp
1681  //--------------------------------------------------------------------------
1682  uint32_t seconds = 0;
1683  if( rsp->hdr.status == kXR_wait )
1684  seconds = ntohl( rsp->body.wait.seconds ) + 5; // we need extra time
1685  // to re-send the request
1686  else if( rsp->hdr.status == kXR_waitresp )
1687  {
1688  seconds = ntohl( rsp->body.waitresp.seconds );
1689 
1690  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %u seconds, "
1691  "setting up wait barrier.",
1692  info->streamName.c_str(),
1693  seconds );
1694  }
1695 
1696  time_t barrier = time(0) + seconds;
1697  if( info->waitBarrier < barrier )
1698  info->waitBarrier = barrier;
1699 
1700  //--------------------------------------------------------------------------
1701  // If we got a response to an open request, we may need to bump the counter
1702  // of open files
1703  //--------------------------------------------------------------------------
1704  uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1705  std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1706  if( sidIt != info->sentOpens.end() )
1707  {
1708  if( rsp->hdr.status == kXR_waitresp )
1709  return NoAction;
1710  info->sentOpens.erase( sidIt );
1711  if( rsp->hdr.status == kXR_ok )
1712  {
1713  ++info->openFiles;
1714  info->finstcnt.fetch_add( 1, std::memory_order_relaxed ); // another file File object instance has been bound with this connection
1715  }
1716  return NoAction;
1717  }
1718 
1719  //--------------------------------------------------------------------------
1720  // If we got a response to a close, we may need to decrement the counter of
1721  // open files
1722  //--------------------------------------------------------------------------
1723  sidIt = info->sentCloses.find( sid );
1724  if( sidIt != info->sentCloses.end() )
1725  {
1726  if( rsp->hdr.status == kXR_waitresp )
1727  return NoAction;
1728  info->sentCloses.erase( sidIt );
1729  --info->openFiles;
1730  return NoAction;
1731  }
1732  return NoAction;
1733  }
kXR_char streamid[2]
Definition: XProtocol.hh:914
@ kXR_waitresp
Definition: XProtocol.hh:906
@ kXR_ok
Definition: XProtocol.hh:899
@ kXR_attn
Definition: XProtocol.hh:901
@ kXR_wait
Definition: XProtocol.hh:905
@ RequestClose
Send a close request.
const uint64_t XRootDMsg

References ServerResponse::body, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, kXR_attn, kXR_ok, kXR_wait, kXR_waitresp, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportHandler::NoAction, XrdCl::XRootDChannelInfo::openFiles, XrdCl::TransportHandler::RequestClose, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, ServerResponseHeader::status, ServerResponseHeader::streamid, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, XrdCl::XRootDChannelInfo::waitBarrier, XrdCl::XRootDMsg, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ MessageSent()

void XrdCl::XRootDTransport::MessageSent ( Message msg,
uint16_t  subStream,
uint32_t  bytesSent,
AnyObject channelData 
)
virtual

Notify the transport about a message having been sent.

Implements XrdCl::TransportHandler.

Definition at line 1738 of file XrdClXRootDTransport.cc.

1742  {
1743  // Called when a message has been sent. For messages that return on a
1744  // different pathid (and hence may use a different poller) it is possible
1745  // that the server has already replied and the reply will trigger
1746  // MessageReceived() before this method has been called. However for open
1747  // and close this is never the case and this method is used for tracking
1748  // only those.
1749  XRootDChannelInfo *info = 0;
1750  channelData.Get( info );
1751  XrdSysMutexHelper scopedLock( info->mutex );
1752  ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1753  uint16_t reqid = ntohs( req->header.requestid );
1754 
1755 
1756  //--------------------------------------------------------------------------
1757  // We need to track opens to know if we can close streams due to idleness
1758  //--------------------------------------------------------------------------
1759  uint16_t sid;
1760  memcpy( &sid, req->header.streamid, 2 );
1761 
1762  if( reqid == kXR_open )
1763  info->sentOpens.insert( sid );
1764  else if( reqid == kXR_close )
1765  info->sentCloses.insert( sid );
1766  }
kXR_char streamid[2]
Definition: XProtocol.hh:156

References XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_close, kXR_open, XrdCl::XRootDChannelInfo::mutex, ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ Multiplex()

PathID XrdCl::XRootDTransport::Multiplex ( Message msg,
AnyObject channelData,
PathID hint = 0 
)
virtual

Return the ID for the up stream this message should be sent by and the down stream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 862 of file XrdClXRootDTransport.cc.

863  {
864  return PathID( 0, 0 );
865  }

◆ MultiplexSubStream()

PathID XrdCl::XRootDTransport::MultiplexSubStream ( Message msg,
AnyObject channelData,
PathID hint = 0 
)
virtual

Return the ID for the up substream this message should be sent by and the down substream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 870 of file XrdClXRootDTransport.cc.

873  {
874  XRootDChannelInfo *info = 0;
875  channelData.Get( info );
876 
877  if (!info) {
879  "Internal error: no channel info, cannot multiplex");
880  return PathID(0,0);
881  }
882 
883  XrdSysMutexHelper scopedLock( info->mutex );
884 
885  //--------------------------------------------------------------------------
886  // If we're not connected to a data server or we don't know that yet
887  // we stream through 0
888  //--------------------------------------------------------------------------
889  if( !(info->serverFlags & kXR_isServer) || info->stream.size() == 0 )
890  return PathID( 0, 0 );
891 
892  //--------------------------------------------------------------------------
893  // Select the streams
894  //--------------------------------------------------------------------------
895  Log *log = DefaultEnv::GetLog();
896  uint16_t upStream = 0;
897  uint16_t downStream = 0;
898 
899  if( hint )
900  {
901  upStream = hint->up;
902  downStream = hint->down;
903  }
904  else
905  {
906  upStream = 0;
907  std::vector<bool> connected;
908  connected.reserve( info->stream.size() - 1 );
909  size_t nbConnected = 0;
910  for( size_t i = 1; i < info->stream.size(); ++i )
911  if( info->stream[i].status == XRootDStreamInfo::Connected )
912  {
913  connected.push_back( true );
914  ++nbConnected;
915  }
916  else
917  connected.push_back( false );
918 
919  if( nbConnected == 0 )
920  downStream = 0;
921  else
922  downStream = info->strmSelector->Select( connected );
923  }
924 
925  if( upStream >= info->stream.size() )
926  {
927  log->Debug( XRootDTransportMsg,
928  "[%s] Up link stream %d does not exist, using 0",
929  info->streamName.c_str(), upStream );
930  upStream = 0;
931  }
932 
933  if( downStream >= info->stream.size() )
934  {
935  log->Debug( XRootDTransportMsg,
936  "[%s] Down link stream %d does not exist, using 0",
937  info->streamName.c_str(), downStream );
938  downStream = 0;
939  }
940 
941  //--------------------------------------------------------------------------
942  // Modify the message
943  //--------------------------------------------------------------------------
944  UnMarshallRequest( msg );
945  ClientRequestHdr *hdr = (ClientRequestHdr*)msg->GetBuffer();
946  switch( hdr->requestid )
947  {
948  //------------------------------------------------------------------------
949  // Read - we update the path id to tell the server where we want to
950  // get the response, but we still send the request through stream 0
951  // We need to allocate space for read_args if we don't have it
952  // included yet
953  //------------------------------------------------------------------------
954  case kXR_read:
955  {
956  if( msg->GetSize() < sizeof(ClientReadRequest) + 8 )
957  {
958  msg->ReAllocate( sizeof(ClientReadRequest) + 8 );
959  void *newBuf = msg->GetBuffer(sizeof(ClientReadRequest));
960  memset( newBuf, 0, 8 );
961  ClientReadRequest *req = (ClientReadRequest*)msg->GetBuffer();
962  req->dlen += 8;
963  }
964  read_args *args = (read_args*)msg->GetBuffer(sizeof(ClientReadRequest));
965  args->pathid = info->stream[downStream].pathId;
966  break;
967  }
968 
969 
970  //------------------------------------------------------------------------
971  // PgRead - we update the path id to tell the server where we want to
972  // get the response, but we still send the request through stream 0
973  // We need to allocate space for ClientPgReadReqArgs if we don't have it
974  // included yet
975  //------------------------------------------------------------------------
976  case kXR_pgread:
977  {
978  if( msg->GetSize() < sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) )
979  {
980  msg->ReAllocate( sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) );
981  void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
982  memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
983  ClientPgReadRequest *req = (ClientPgReadRequest*)msg->GetBuffer();
984  req->dlen += sizeof( ClientPgReadReqArgs );
985  }
986  ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
987  msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
988  args->pathid = info->stream[downStream].pathId;
989  break;
990  }
991 
992  //------------------------------------------------------------------------
993  // ReadV - the situation is identical to read but we don't need any
994  // additional structures to specify the return path
995  //------------------------------------------------------------------------
996  case kXR_readv:
997  {
998  ClientReadVRequest *req = (ClientReadVRequest*)msg->GetBuffer();
999  req->pathid = info->stream[downStream].pathId;
1000  break;
1001  }
1002 
1003  //------------------------------------------------------------------------
1004  // Write - multiplexing writes doesn't work properly in the server
1005  //------------------------------------------------------------------------
1006  case kXR_write:
1007  {
1008 // ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
1009 // req->pathid = info->stream[downStream].pathId;
1010  break;
1011  }
1012 
1013  //------------------------------------------------------------------------
1014  // WriteV - multiplexing writes doesn't work properly in the server
1015  //------------------------------------------------------------------------
1016  case kXR_writev:
1017  {
1018 // ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
1019 // req->pathid = info->stream[downStream].pathId;
1020  break;
1021  }
1022 
1023  //------------------------------------------------------------------------
1024  // PgWrite - multiplexing writes doesn't work properly in the server
1025  //------------------------------------------------------------------------
1026  case kXR_pgwrite:
1027  {
1028 // ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
1029 // req->pathid = info->stream[downStream].pathId;
1030  break;
1031  }
1032  };
1033  MarshallRequest( msg );
1034  return PathID( upStream, downStream );
1035  }
kXR_char pathid
Definition: XProtocol.hh:653
kXR_int32 dlen
Definition: XProtocol.hh:648
static XRootDStatus UnMarshallRequest(Message *msg)

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Debug(), ClientPgReadRequest::dlen, ClientReadRequest::dlen, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), kXR_isServer, kXR_pgread, kXR_pgwrite, kXR_read, kXR_readv, kXR_write, kXR_writev, MarshallRequest(), XrdCl::XRootDChannelInfo::mutex, ClientPgReadReqArgs::pathid, read_args::pathid, ClientReadVRequest::pathid, XrdCl::Buffer::ReAllocate(), ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, UnMarshallRequest(), XrdCl::PathID::up, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ NbConnectedStrm()

uint16_t XrdCl::XRootDTransport::NbConnectedStrm ( AnyObject channelData)
static

Number of currently connected data streams.

Definition at line 1521 of file XrdClXRootDTransport.cc.

1522  {
1523  XRootDChannelInfo *info = 0;
1524  channelData.Get( info );
1525 
1526  if (!info) {
1527  DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1528  return 0;
1529  }
1530 
1531  XrdSysMutexHelper scopedLock( info->mutex );
1532 
1533  uint16_t nbConnected = 0;
1534  for( size_t i = 1; i < info->stream.size(); ++i )
1535  if( info->stream[i].status == XRootDStreamInfo::Connected )
1536  ++nbConnected;
1537 
1538  return nbConnected;
1539  }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDTransportMsg.

Referenced by XrdCl::Channel::NbConnectedStrm().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ NeedControlConnection()

virtual bool XrdCl::XRootDTransport::NeedControlConnection ( )
inlinevirtual

Return the information whether a control connection needs to be valid before establishing other connections

Definition at line 167 of file XrdClXRootDTransport.hh.

168  {
169  return true;
170  }

◆ NeedEncryption()

bool XrdCl::XRootDTransport::NeedEncryption ( HandShakeData handShakeData,
AnyObject channelData 
)
virtual
Returns
: true if encryption should be turned on, false otherwise

Implements XrdCl::TransportHandler.

Definition at line 1832 of file XrdClXRootDTransport.cc.

1834  {
1835  XRootDChannelInfo *info = 0;
1836  channelData.Get( info );
1837 
1839  int notlsok = DefaultNoTlsOK;
1840  env->GetInt( "NoTlsOK", notlsok );
1841 
1842 
1843  if( notlsok )
1844  return info->encrypted;
1845 
1846  // Did the server instructed us to switch to TLS right away?
1847  if( info->serverFlags & kXR_gotoTLS )
1848  {
1849  info->encrypted = true;
1850  return true ;
1851  }
1852 
1853  XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
1854 
1855  //--------------------------------------------------------------------------
1856  // The control stream (sub-stream 0) might need to switch to TLS before
1857  // login or after login
1858  //--------------------------------------------------------------------------
1859  if( handShakeData->subStreamId == 0 )
1860  {
1861  //------------------------------------------------------------------------
1862  // We are about to login and the server asked to start encrypting
1863  // before login
1864  //------------------------------------------------------------------------
1865  if( ( sInfo.status == XRootDStreamInfo::LoginSent ) &&
1866  ( info->serverFlags & kXR_tlsLogin ) )
1867  {
1868  info->encrypted = true;
1869  return true;
1870  }
1871 
1872  //--------------------------------------------------------------------
1873  // The hand-shake is done and the server requested to encrypt the session
1874  //--------------------------------------------------------------------
1875  if( (sInfo.status == XRootDStreamInfo::Connected ||
1876  //--------------------------------------------------------------------
1877  // we really need to turn on TLS before we sent kXR_endsess and we
1878  // are about to do so (1st enable encryption, then send kXR_endsess)
1879  //--------------------------------------------------------------------
1880  sInfo.status == XRootDStreamInfo::EndSessionSent ) &&
1881  ( info->serverFlags & kXR_tlsSess ) )
1882  {
1883  info->encrypted = true;
1884  return true;
1885  }
1886  }
1887  //--------------------------------------------------------------------------
1888  // A data stream (sub-stream > 0) if need be will be switched to TLS before
1889  // bind.
1890  //--------------------------------------------------------------------------
1891  else
1892  {
1893  //------------------------------------------------------------------------
1894  // We are about to bind a data stream and the server asked to start
1895  // encrypting before bind
1896  //------------------------------------------------------------------------
1897  if( ( sInfo.status == XRootDStreamInfo::BindSent ) &&
1898  ( info->serverFlags & kXR_tlsData ) )
1899  {
1900  info->encrypted = true;
1901  return true;
1902  }
1903  }
1904 
1905  return false;
1906  }
#define kXR_tlsLogin
Definition: XProtocol.hh:1184
#define kXR_gotoTLS
Definition: XProtocol.hh:1180
#define kXR_tlsSess
Definition: XProtocol.hh:1185
#define kXR_tlsData
Definition: XProtocol.hh:1182
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
const int DefaultNoTlsOK

References XrdCl::XRootDStreamInfo::BindSent, XrdCl::XRootDStreamInfo::Connected, XrdCl::DefaultNoTlsOK, XrdCl::XRootDChannelInfo::encrypted, XrdCl::XRootDStreamInfo::EndSessionSent, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), kXR_gotoTLS, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDStreamInfo::LoginSent, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::HandShakeData::subStreamId.

+ Here is the call graph for this function:

◆ Query()

Status XrdCl::XRootDTransport::Query ( uint16_t  query,
AnyObject result,
AnyObject channelData 
)
virtual

Query the channel.

Implements XrdCl::TransportHandler.

Definition at line 1578 of file XrdClXRootDTransport.cc.

1581  {
1582  XRootDChannelInfo *info = 0;
1583  channelData.Get( info );
1584 
1585  if (!info)
1586  return XRootDStatus(stFatal, errInternal);
1587 
1588  XrdSysMutexHelper scopedLock( info->mutex );
1589 
1590  switch( query )
1591  {
1592  //------------------------------------------------------------------------
1593  // Protocol name
1594  //------------------------------------------------------------------------
1595  case TransportQuery::Name:
1596  result.Set( (const char*)"XRootD", false );
1597  return Status();
1598 
1599  //------------------------------------------------------------------------
1600  // Authentication
1601  //------------------------------------------------------------------------
1602  case TransportQuery::Auth:
1603  result.Set( new std::string( info->authProtocolName ), false );
1604  return Status();
1605 
1606  //------------------------------------------------------------------------
1607  // Server flags
1608  //------------------------------------------------------------------------
1610  result.Set( new int( info->serverFlags ), false );
1611  return Status();
1612 
1613  //------------------------------------------------------------------------
1614  // Protocol version
1615  //------------------------------------------------------------------------
1617  result.Set( new int( info->protocolVersion ), false );
1618  return Status();
1619 
1621  result.Set( new bool( info->encrypted ), false );
1622  return Status();
1623  };
1624  return Status( stError, errQueryNotSupported );
1625  }
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted

References XrdCl::TransportQuery::Auth, XrdCl::XRootDChannelInfo::authProtocolName, XrdCl::XRootDChannelInfo::encrypted, XrdCl::errInternal, XrdCl::errQueryNotSupported, XrdCl::AnyObject::Get(), XrdCl::XRootDQuery::IsEncrypted, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportQuery::Name, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::XRootDChannelInfo::protocolVersion, XrdCl::XRootDQuery::ServerFlags, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::AnyObject::Set(), XrdCl::stError, and XrdCl::stFatal.

+ Here is the call graph for this function:

◆ SetDescription()

static void XrdCl::XRootDTransport::SetDescription ( Message msg)
inlinestatic

Get the description of a message.

Definition at line 245 of file XrdClXRootDTransport.hh.

246  {
247  std::ostringstream o;
248  GenerateDescription( msg->GetBuffer(), o );
249  msg->SetDescription( o.str() );
250  }

References GenerateDescription(), XrdCl::Buffer::GetBuffer(), and XrdCl::Message::SetDescription().

Referenced by XrdCl::FileStateHandler::Checkpoint(), XrdCl::FileStateHandler::ChkptWrt(), XrdCl::FileStateHandler::ChkptWrtV(), XrdCl::FileSystem::ChMod(), XrdCl::FileStateHandler::Close(), XrdCl::FileSystem::DirList(), XrdCl::FileStateHandler::Fcntl(), XrdCl::FileSystem::Locate(), XrdCl::FileSystem::MkDir(), XrdCl::FileSystem::Mv(), XrdCl::FileStateHandler::Open(), XrdCl::FileStateHandler::PgReadImpl(), XrdCl::FileStateHandler::PgWriteImpl(), XrdCl::FileSystem::Ping(), XrdCl::FileSystem::Prepare(), XrdCl::FileSystem::Protocol(), XrdCl::FileSystem::Query(), XrdCl::FileStateHandler::Read(), XrdCl::FileStateHandler::ReadV(), XrdCl::MessageUtils::RewriteCGIAndPath(), XrdCl::FileSystem::Rm(), XrdCl::FileSystem::RmDir(), XrdCl::FileSystem::Stat(), XrdCl::FileStateHandler::Stat(), XrdCl::FileSystem::StatVFS(), XrdCl::FileStateHandler::Sync(), XrdCl::FileSystem::Truncate(), XrdCl::FileStateHandler::Truncate(), XrdCl::FileStateHandler::VectorRead(), XrdCl::FileStateHandler::VectorWrite(), XrdCl::FileStateHandler::Visa(), XrdCl::FileStateHandler::Write(), and XrdCl::FileStateHandler::WriteV().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SubStreamNumber()

uint16_t XrdCl::XRootDTransport::SubStreamNumber ( AnyObject channelData)
virtual

Return a number of substreams per stream that should be created.

Implements XrdCl::TransportHandler.

Definition at line 1042 of file XrdClXRootDTransport.cc.

1043  {
1044  XRootDChannelInfo *info = 0;
1045  channelData.Get( info );
1046 
1047  if (!info) {
1048  DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1049  return 1;
1050  }
1051 
1052  XrdSysMutexHelper scopedLock( info->mutex );
1053 
1054  //--------------------------------------------------------------------------
1055  // If the connection has been opened in order to orchestrate a TPC or
1056  // the remote server is a Manager or Metamanager we will need only one
1057  // (control) stream.
1058  //--------------------------------------------------------------------------
1059  if( info->istpc || !(info->serverFlags & kXR_isServer ) ) return 1;
1060 
1061  //--------------------------------------------------------------------------
1062  // Number of streams requested by user
1063  //--------------------------------------------------------------------------
1064  uint16_t ret = info->stream.size();
1065 
1067  int nodata = DefaultTlsNoData;
1068  env->GetInt( "TlsNoData", nodata );
1069 
1070  // Does the server require the stream 0 to be encrypted?
1071  bool srvTlsStrm0 = ( info->serverFlags & kXR_gotoTLS ) ||
1072  ( info->serverFlags & kXR_tlsLogin ) ||
1073  ( info->serverFlags & kXR_tlsSess );
1074  // Does the server NOT require the data streams to be encrypted?
1075  bool srvNoTlsData = !( info->serverFlags & kXR_tlsData );
1076  // Does the user require the stream 0 to be encrypted?
1077  bool usrTlsStrm0 = info->encrypted;
1078  // Does the user NOT require the data streams to be encrypted?
1079  bool usrNoTlsData = !info->encrypted || ( info->encrypted && nodata );
1080 
1081  if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1082  ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1083  {
1084  //------------------------------------------------------------------------
1085  // The server or user asked us to encrypt stream 0, but to send the data
1086  // (read/write) using a plain TCP connection
1087  //------------------------------------------------------------------------
1088  if( ret == 1 ) ++ret;
1089  }
1090 
1091  if( ret > info->stream.size() )
1092  {
1093  info->stream.resize( ret );
1094  info->strmSelector->AdjustQueues( ret );
1095  }
1096 
1097  return ret;
1098  }
const int DefaultTlsNoData

References XrdCl::DefaultTlsNoData, XrdCl::XRootDChannelInfo::encrypted, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::istpc, kXR_gotoTLS, kXR_isServer, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::strmSelector, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ UnMarchalStatusMore()

XRootDStatus XrdCl::XRootDTransport::UnMarchalStatusMore ( Message msg)
static

Unmarshall the correction-segment of the status response for pgwrite.

Definition at line 1434 of file XrdClXRootDTransport.cc.

1435  {
1436  ServerResponseV2 *rsp = (ServerResponseV2*)msg.GetBuffer();
1437  uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest;
1438 
1439  switch( reqType )
1440  {
1441  case kXR_pgwrite:
1442  {
1443  //--------------------------------------------------------------------------
1444  // If there's no additional data there's nothing to unmarshal
1445  //--------------------------------------------------------------------------
1446  if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
1447  //--------------------------------------------------------------------------
1448  // If there's not enough data to form correction-segment report an error
1449  //--------------------------------------------------------------------------
1450  if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
1451  return XRootDStatus( stError, errInvalidMessage, 0,
1452  "kXR_status: invalid message size." );
1453 
1454  //--------------------------------------------------------------------------
1455  // Calculate the crc32c for the additional data
1456  //--------------------------------------------------------------------------
1457  ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)msg.GetBuffer( sizeof( ServerResponseV2 ) );
1458  cse->cseCRC = ntohl( cse->cseCRC );
1459  size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
1460  void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
1461  uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1462 
1463  //--------------------------------------------------------------------------
1464  // Do the integrity checks
1465  //--------------------------------------------------------------------------
1466  if( crcval != cse->cseCRC )
1467  {
1468  return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1469  "corrupted (crc32c integrity check failed)." );
1470  }
1471 
1472  cse->dlFirst = ntohs( cse->dlFirst );
1473  cse->dlLast = ntohs( cse->dlLast );
1474 
1475  size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
1476  sizeof( kXR_int64 );
1477  kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) +
1478  sizeof( ServerResponseBody_pgWrCSE ) );
1479 
1480  for( size_t i = 0; i < pgcnt; ++i )
1481  pgoffs[i] = ntohll( pgoffs[i] );
1482 
1483  return XRootDStatus();
1484  break;
1485  }
1486 
1487  default:
1488  break;
1489  }
1490 
1491  return XRootDStatus( stError, errNotSupported );
1492  }
ServerResponseStatus status
Definition: XProtocol.hh:1310
@ kXR_1stRequest
Definition: XProtocol.hh:111
long long kXR_int64
Definition: XPtypes.hh:98
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_pgWrCSE::cseCRC, ServerResponseBody_Status::dlen, ServerResponseBody_pgWrCSE::dlFirst, ServerResponseBody_pgWrCSE::dlLast, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errNotSupported, XrdCl::Buffer::GetBuffer(), kXR_1stRequest, kXR_pgwrite, ServerResponseBody_Status::requestid, ServerResponseV2::status, and XrdCl::stError.

Referenced by GetMore().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshallBody ( Message msg,
uint16_t  reqType 
)
static

Unmarshall the body of the incoming message.

Definition at line 1280 of file XrdClXRootDTransport.cc.

1281  {
1282  ServerResponse *m = (ServerResponse *)msg->GetBuffer();
1283 
1284  //--------------------------------------------------------------------------
1285  // kXR_ok
1286  //--------------------------------------------------------------------------
1287  if( m->hdr.status == kXR_ok )
1288  {
1289  switch( reqType )
1290  {
1291  //----------------------------------------------------------------------
1292  // kXR_protocol
1293  //----------------------------------------------------------------------
1294  case kXR_protocol:
1295  if( m->hdr.dlen < 8 )
1296  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_protocol: body too short." );
1297  m->body.protocol.pval = ntohl( m->body.protocol.pval );
1298  m->body.protocol.flags = ntohl( m->body.protocol.flags );
1299  break;
1300  }
1301  }
1302  //--------------------------------------------------------------------------
1303  // kXR_error
1304  //--------------------------------------------------------------------------
1305  else if( m->hdr.status == kXR_error )
1306  {
1307  if( m->hdr.dlen < 4 )
1308  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_error: body too short." );
1309  m->body.error.errnum = ntohl( m->body.error.errnum );
1310  }
1311 
1312  //--------------------------------------------------------------------------
1313  // kXR_wait
1314  //--------------------------------------------------------------------------
1315  else if( m->hdr.status == kXR_wait )
1316  {
1317  if( m->hdr.dlen < 4 )
1318  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_wait: body too short." );
1319  m->body.wait.seconds = htonl( m->body.wait.seconds );
1320  }
1321 
1322  //--------------------------------------------------------------------------
1323  // kXR_redirect
1324  //--------------------------------------------------------------------------
1325  else if( m->hdr.status == kXR_redirect )
1326  {
1327  if( m->hdr.dlen < 4 )
1328  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_redirect: body too short." );
1329  m->body.redirect.port = htonl( m->body.redirect.port );
1330  }
1331 
1332  //--------------------------------------------------------------------------
1333  // kXR_waitresp
1334  //--------------------------------------------------------------------------
1335  else if( m->hdr.status == kXR_waitresp )
1336  {
1337  if( m->hdr.dlen < 4 )
1338  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_waitresp: body too short." );
1339  m->body.waitresp.seconds = htonl( m->body.waitresp.seconds );
1340  }
1341 
1342  //--------------------------------------------------------------------------
1343  // kXR_attn
1344  //--------------------------------------------------------------------------
1345  else if( m->hdr.status == kXR_attn )
1346  {
1347  if( m->hdr.dlen < 4 )
1348  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_attn: body too short." );
1349  m->body.attn.actnum = htonl( m->body.attn.actnum );
1350  }
1351 
1352  return XRootDStatus();
1353  }
@ kXR_redirect
Definition: XProtocol.hh:904
@ kXR_error
Definition: XProtocol.hh:903

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), ServerResponse::hdr, kXR_attn, kXR_error, kXR_ok, kXR_protocol, kXR_redirect, kXR_wait, kXR_waitresp, ServerResponseHeader::status, and XrdCl::stError.

Referenced by XrdCl::XRootDMsgHandler::Process().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallHeader()

void XrdCl::XRootDTransport::UnMarshallHeader ( Message msg)
static

Unmarshall the header incoming message.

Definition at line 1497 of file XrdClXRootDTransport.cc.

1498  {
1499  ServerResponseHeader *header = (ServerResponseHeader *)msg.GetBuffer();
1500  header->status = ntohs( header->status );
1501  header->dlen = ntohl( header->dlen );
1502  }

References ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), and ServerResponseHeader::status.

Referenced by GetHeader().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallRequest()

XRootDStatus XrdCl::XRootDTransport::UnMarshallRequest ( Message msg)
static

Unmarshall the request - sometimes the requests need to be rewritten, so we need to unmarshall them

Definition at line 1259 of file XrdClXRootDTransport.cc.

1260  {
1261  if( !msg->IsMarshalled() ) return XRootDStatus( stOK, suAlreadyDone );
1262  // We rely on the marshaling process to be symmetric!
1263  // First we unmarshall the request ID and the length because
1264  // MarshallRequest() relies on these, and then we need to unmarshall these
1265  // two again, because they get marshalled in MarshallRequest().
1266  // All this is pretty damn ugly and should be rewritten.
1267  ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1268  req->header.requestid = htons( req->header.requestid );
1269  req->header.dlen = htonl( req->header.dlen );
1270  XRootDStatus st = MarshallRequest( msg );
1271  req->header.requestid = htons( req->header.requestid );
1272  req->header.dlen = htonl( req->header.dlen );
1273  msg->SetIsMarshalled( false );
1274  return st;
1275  }
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42

References ClientRequestHdr::dlen, XrdCl::Buffer::GetBuffer(), ClientRequest::header, XrdCl::Message::IsMarshalled(), MarshallRequest(), ClientRequestHdr::requestid, XrdCl::Message::SetIsMarshalled(), XrdCl::stOK, and XrdCl::suAlreadyDone.

Referenced by MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshalStatusBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshalStatusBody ( Message msg,
uint16_t  reqType 
)
static

Unmarshall the body of the status response.

Definition at line 1358 of file XrdClXRootDTransport.cc.

1359  {
1360  //--------------------------------------------------------------------------
1361  // Calculate the crc32c before the unmarshaling the body!
1362  //--------------------------------------------------------------------------
1363  ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
1364  char *buffer = msg.GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
1365  size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
1366  uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1367 
1368  size_t stlen = sizeof( ServerResponseStatus );
1369  switch( reqType )
1370  {
1371  case kXR_pgread:
1372  {
1373  stlen += sizeof( ServerResponseBody_pgRead );
1374  break;
1375  }
1376 
1377  case kXR_pgwrite:
1378  {
1379  stlen += sizeof( ServerResponseBody_pgWrite );
1380  break;
1381  }
1382  }
1383 
1384  if( msg.GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
1385  "kXR_status: invalid message size." );
1386 
1387  rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
1388  rspst->bdy.dlen = ntohl( rspst->bdy.dlen );
1389 
1390  switch( reqType )
1391  {
1392  case kXR_pgread:
1393  {
1394  ServerResponseBody_pgRead *pgrdbdy = (ServerResponseBody_pgRead*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1395  pgrdbdy->offset = ntohll( pgrdbdy->offset );
1396  break;
1397  }
1398 
1399  case kXR_pgwrite:
1400  {
1401  ServerResponseBody_pgWrite *pgwrtbdy = (ServerResponseBody_pgWrite*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1402  pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
1403  break;
1404  }
1405  }
1406 
1407  //--------------------------------------------------------------------------
1408  // Do the integrity checks
1409  //--------------------------------------------------------------------------
1410  if( crcval != rspst->bdy.crc32c )
1411  {
1412  return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1413  "corrupted (crc32c integrity check failed)." );
1414  }
1415 
1416  if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
1417  rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
1418  {
1419  return XRootDStatus( stError, errDataError, 0, "response header corrupted "
1420  "(stream ID mismatch)." );
1421  }
1422 
1423 
1424 
1425  if( rspst->bdy.requestid + kXR_1stRequest != reqType )
1426  {
1427  return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
1428  "(request ID mismatch)." );
1429  }
1430 
1431  return XRootDStatus();
1432  }
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1261

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_Status::crc32c, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetSize(), ServerResponseStatus::hdr, kXR_1stRequest, kXR_pgread, kXR_pgwrite, ServerResponseBody_pgRead::offset, ServerResponseBody_pgWrite::offset, ServerResponseBody_Status::requestid, XrdCl::stError, ServerResponseHeader::streamid, and ServerResponseBody_Status::streamID.

Referenced by XrdCl::XRootDMsgHandler::InspectStatusRsp().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ WaitBeforeExit()

void XrdCl::XRootDTransport::WaitBeforeExit ( )
virtual

Wait until the program can safely exit.

Implements XrdCl::TransportHandler.

Definition at line 1823 of file XrdClXRootDTransport.cc.

1824  {
1825  XrdSysRWLockHelper scope( pSecUnloadHandler->lock, false ); // obtain write lock
1826  pSecUnloadHandler->unloaded = true;
1827  }

References XrdCl::PluginUnloadHandler::lock, and XrdCl::PluginUnloadHandler::unloaded.

Friends And Related Function Documentation

◆ PluginUnloadHandler

friend struct PluginUnloadHandler
friend

Definition at line 432 of file XrdClXRootDTransport.hh.


The documentation for this class was generated from the following files: