XRootD
XrdCl::XRootDTransport Class Reference

XRootD transport handler. More...

#include <XrdClXRootDTransport.hh>

+ Inheritance diagram for XrdCl::XRootDTransport:
+ Collaboration diagram for XrdCl::XRootDTransport:

Public Member Functions

 XRootDTransport ()
 Constructor. More...
 
 ~XRootDTransport ()
 Destructor. More...
 
virtual void DecFileInstCnt (AnyObject &channelData)
 Decrement file object instance count bound to this channel. More...
 
virtual void Disconnect (AnyObject &channelData, uint16_t subStreamId)
 The stream has been disconnected, do the cleanups. More...
 
virtual void FinalizeChannel (AnyObject &channelData)
 Finalize channel. More...
 
virtual URL GetBindPreference (const URL &url, AnyObject &channelData)
 Get bind preference for the next data stream. More...
 
virtual XRootDStatus GetBody (Message &message, Socket *socket)
 
virtual XRootDStatus GetHeader (Message &message, Socket *socket)
 
virtual XRootDStatus GetMore (Message &message, Socket *socket)
 
virtual Status GetSignature (Message *toSign, Message *&sign, AnyObject &channelData)
 Get signature for given message. More...
 
virtual Status GetSignature (Message *toSign, Message *&sign, XRootDChannelInfo *info)
 Get signature for given message. More...
 
virtual XRootDStatus HandShake (HandShakeData *handShakeData, AnyObject &channelData)
 HandShake. More...
 
virtual bool HandShakeDone (HandShakeData *handShakeData, AnyObject &channelData)
 
virtual void InitializeChannel (const URL &url, AnyObject &channelData)
 Initialize channel. More...
 
virtual Status IsStreamBroken (time_t inactiveTime, AnyObject &channelData)
 
virtual bool IsStreamTTLElapsed (time_t time, AnyObject &channelData)
 Check if the stream should be disconnected. More...
 
virtual uint32_t MessageReceived (Message &msg, uint16_t subStream, AnyObject &channelData)
 Check if the message invokes a stream action. More...
 
virtual void MessageSent (Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
 Notify the transport about a message having been sent. More...
 
virtual PathID Multiplex (Message *msg, AnyObject &channelData, PathID *hint=0)
 
virtual PathID MultiplexSubStream (Message *msg, AnyObject &channelData, PathID *hint=0)
 
virtual bool NeedControlConnection ()
 
virtual bool NeedEncryption (HandShakeData *handShakeData, AnyObject &channelData)
 
virtual Status Query (uint16_t query, AnyObject &result, AnyObject &channelData)
 Query the channel. More...
 
virtual uint16_t SubStreamNumber (AnyObject &channelData)
 Return a number of substreams per stream that should be created. More...
 
virtual void WaitBeforeExit ()
 Wait until the program can safely exit. More...
 
- Public Member Functions inherited from XrdCl::TransportHandler
virtual ~TransportHandler ()
 

Static Public Member Functions

static void GenerateDescription (char *msg, std::ostringstream &o)
 Get the description of a message. More...
 
static void LogErrorResponse (const Message &msg)
 Log server error response. More...
 
static XRootDStatus MarshallRequest (char *msg)
 Marshal the outgoing message. More...
 
static XRootDStatus MarshallRequest (Message *msg)
 Marshal the outgoing message. More...
 
static uint16_t NbConnectedStrm (AnyObject &channelData)
 Number of currently connected data streams. More...
 
static void SetDescription (Message *msg)
 Get the description of a message. More...
 
static XRootDStatus UnMarchalStatusMore (Message &msg)
 Unmarshall the correction-segment of the status response for pgwrite. More...
 
static XRootDStatus UnMarshallBody (Message *msg, uint16_t reqType)
 Unmarshall the body of the incoming message. More...
 
static void UnMarshallHeader (Message &msg)
 Unmarshall the header incoming message. More...
 
static XRootDStatus UnMarshallRequest (Message *msg)
 
static XRootDStatus UnMarshalStatusBody (Message &msg, uint16_t reqType)
 Unmarshall the body of the status response. More...
 

Friends

struct PluginUnloadHandler
 

Additional Inherited Members

- Public Types inherited from XrdCl::TransportHandler
enum  StreamAction {
  NoAction = 0x0000 ,
  DigestMsg = 0x0001 ,
  AbortStream = 0x0002 ,
  CloseStream = 0x0004 ,
  ResumeStream = 0x0008 ,
  HoldStream = 0x0010 ,
  RequestClose = 0x0020
}
 Stream actions that may be triggered by incoming control messages. More...
 

Detailed Description

XRootD transport handler.

Definition at line 47 of file XrdClXRootDTransport.hh.

Constructor & Destructor Documentation

◆ XRootDTransport()

XrdCl::XRootDTransport::XRootDTransport ( )

Constructor.

Definition at line 291 of file XrdClXRootDTransport.cc.

291  :
292  pSecUnloadHandler( new PluginUnloadHandler() )
293  {
294  }

◆ ~XRootDTransport()

XrdCl::XRootDTransport::~XRootDTransport ( )

Destructor.

Definition at line 299 of file XrdClXRootDTransport.cc.

300  {
301  delete pSecUnloadHandler; pSecUnloadHandler = 0;
302  }

Member Function Documentation

◆ DecFileInstCnt()

void XrdCl::XRootDTransport::DecFileInstCnt ( AnyObject channelData)
virtual

Decrement file object instance count bound to this channel.

Implements XrdCl::TransportHandler.

Definition at line 1743 of file XrdClXRootDTransport.cc.

1744  {
1745  XRootDChannelInfo *info = 0;
1746  channelData.Get( info );
1747  if( info->finstcnt.load( std::memory_order_relaxed ) > 0 )
1748  info->finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1749  }

References XrdCl::XRootDChannelInfo::finstcnt, and XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ Disconnect()

void XrdCl::XRootDTransport::Disconnect ( AnyObject channelData,
uint16_t  subStreamId 
)
virtual

The stream has been disconnected, do the cleanups.

Implements XrdCl::TransportHandler.

Definition at line 1485 of file XrdClXRootDTransport.cc.

1487  {
1488  XRootDChannelInfo *info = 0;
1489  channelData.Get( info );
1490  XrdSysMutexHelper scopedLock( info->mutex );
1491 
1492  CleanUpProtection( info );
1493 
1494  if( !info->stream.empty() )
1495  {
1496  XRootDStreamInfo &sInfo = info->stream[subStreamId];
1497  sInfo.status = XRootDStreamInfo::Disconnected;
1498  }
1499 
1500  if( subStreamId == 0 )
1501  {
1502  info->sidManager->ReleaseAllTimedOut();
1503  info->sentOpens.clear();
1504  info->sentCloses.clear();
1505  info->openFiles = 0;
1506  info->waitBarrier = 0;
1507  }
1508  }

References XrdCl::XRootDStreamInfo::Disconnected, XrdCl::AnyObject::Get(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::waitBarrier.

+ Here is the call graph for this function:

◆ FinalizeChannel()

void XrdCl::XRootDTransport::FinalizeChannel ( AnyObject channelData)
virtual

Finalize channel.

Implements XrdCl::TransportHandler.

Definition at line 460 of file XrdClXRootDTransport.cc.

461  {
462  }

◆ GenerateDescription()

void XrdCl::XRootDTransport::GenerateDescription ( char *  msg,
std::ostringstream &  o 
)
static

Get the description of a message.

Definition at line 2904 of file XrdClXRootDTransport.cc.

2905  {
2906  Log *log = DefaultEnv::GetLog();
2907  if( log->GetLevel() < Log::ErrorMsg )
2908  return;
2909 
2910  ClientRequestHdr *req = (ClientRequestHdr *)msg;
2911  switch( req->requestid )
2912  {
2913  //------------------------------------------------------------------------
2914  // kXR_open
2915  //------------------------------------------------------------------------
2916  case kXR_open:
2917  {
2918  ClientOpenRequest *sreq = (ClientOpenRequest *)msg;
2919  o << "kXR_open (";
2920  char *fn = GetDataAsString( msg );
2921  o << "file: " << fn << ", ";
2922  delete [] fn;
2923  o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
2924  o << std::setbase(10);
2925  o << "flags: ";
2926  if( sreq->options == 0 )
2927  o << "none";
2928  else
2929  {
2930  if( sreq->options & kXR_delete )
2931  o << "kXR_delete ";
2932  if( sreq->options & kXR_force )
2933  o << "kXR_force ";
2934  if( sreq->options & kXR_mkpath )
2935  o << "kXR_mkpath ";
2936  if( sreq->options & kXR_new )
2937  o << "kXR_new ";
2938  if( sreq->options & kXR_nowait )
2939  o << "kXR_delete ";
2940  if( sreq->options & kXR_open_apnd )
2941  o << "kXR_open_apnd ";
2942  if( sreq->options & kXR_open_read )
2943  o << "kXR_open_read ";
2944  if( sreq->options & kXR_open_updt )
2945  o << "kXR_open_updt ";
2946  if( sreq->options & kXR_posc )
2947  o << "kXR_posc ";
2948  if( sreq->options & kXR_refresh )
2949  o << "kXR_refresh ";
2950  if( sreq->options & kXR_replica )
2951  o << "kXR_replica ";
2952  if( sreq->options & kXR_seqio )
2953  o << "kXR_seqio ";
2954  if( sreq->options & kXR_async )
2955  o << "kXR_async ";
2956  if( sreq->options & kXR_retstat )
2957  o << "kXR_retstat ";
2958  }
2959  o << ")";
2960  break;
2961  }
2962 
2963  //------------------------------------------------------------------------
2964  // kXR_close
2965  //------------------------------------------------------------------------
2966  case kXR_close:
2967  {
2968  ClientCloseRequest *sreq = (ClientCloseRequest *)msg;
2969  o << "kXR_close (";
2970  o << "handle: " << FileHandleToStr( sreq->fhandle );
2971  o << ")";
2972  break;
2973  }
2974 
2975  //------------------------------------------------------------------------
2976  // kXR_stat
2977  //------------------------------------------------------------------------
2978  case kXR_stat:
2979  {
2980  ClientStatRequest *sreq = (ClientStatRequest *)msg;
2981  o << "kXR_stat (";
2982  if( sreq->dlen )
2983  {
2984  char *fn = GetDataAsString( msg );;
2985  o << "path: " << fn << ", ";
2986  delete [] fn;
2987  }
2988  else
2989  {
2990  o << "handle: " << FileHandleToStr( sreq->fhandle );
2991  o << ", ";
2992  }
2993  o << "flags: ";
2994  if( sreq->options == 0 )
2995  o << "none";
2996  else
2997  {
2998  if( sreq->options & kXR_vfs )
2999  o << "kXR_vfs";
3000  }
3001  o << ")";
3002  break;
3003  }
3004 
3005  //------------------------------------------------------------------------
3006  // kXR_read
3007  //------------------------------------------------------------------------
3008  case kXR_read:
3009  {
3010  ClientReadRequest *sreq = (ClientReadRequest *)msg;
3011  o << "kXR_read (";
3012  o << "handle: " << FileHandleToStr( sreq->fhandle );
3013  o << std::setbase(10);
3014  o << ", ";
3015  o << "offset: " << sreq->offset << ", ";
3016  o << "size: " << sreq->rlen << ")";
3017  break;
3018  }
3019 
3020  //------------------------------------------------------------------------
3021  // kXR_pgread
3022  //------------------------------------------------------------------------
3023  case kXR_pgread:
3024  {
3026  o << "kXR_pgread (";
3027  o << "handle: " << FileHandleToStr( sreq->fhandle );
3028  o << std::setbase(10);
3029  o << ", ";
3030  o << "offset: " << sreq->offset << ", ";
3031  o << "size: " << sreq->rlen << ")";
3032  break;
3033  }
3034 
3035  //------------------------------------------------------------------------
3036  // kXR_write
3037  //------------------------------------------------------------------------
3038  case kXR_write:
3039  {
3040  ClientWriteRequest *sreq = (ClientWriteRequest *)msg;
3041  o << "kXR_write (";
3042  o << "handle: " << FileHandleToStr( sreq->fhandle );
3043  o << std::setbase(10);
3044  o << ", ";
3045  o << "offset: " << sreq->offset << ", ";
3046  o << "size: " << sreq->dlen << ")";
3047  break;
3048  }
3049 
3050  //------------------------------------------------------------------------
3051  // kXR_pgwrite
3052  //------------------------------------------------------------------------
3053  case kXR_pgwrite:
3054  {
3056  o << "kXR_pgwrite (";
3057  o << "handle: " << FileHandleToStr( sreq->fhandle );
3058  o << std::setbase(10);
3059  o << ", ";
3060  o << "offset: " << sreq->offset << ", ";
3061  o << "size: " << sreq->dlen << ")";
3062  break;
3063  }
3064 
3065  //------------------------------------------------------------------------
3066  // kXR_sync
3067  //------------------------------------------------------------------------
3068  case kXR_sync:
3069  {
3070  ClientSyncRequest *sreq = (ClientSyncRequest *)msg;
3071  o << "kXR_sync (";
3072  o << "handle: " << FileHandleToStr( sreq->fhandle );
3073  o << ")";
3074  break;
3075  }
3076 
3077  //------------------------------------------------------------------------
3078  // kXR_truncate
3079  //------------------------------------------------------------------------
3080  case kXR_truncate:
3081  {
3083  o << "kXR_truncate (";
3084  if( !sreq->dlen )
3085  o << "handle: " << FileHandleToStr( sreq->fhandle );
3086  else
3087  {
3088  char *fn = GetDataAsString( msg );
3089  o << "file: " << fn;
3090  delete [] fn;
3091  }
3092  o << std::setbase(10);
3093  o << ", ";
3094  o << "offset: " << sreq->offset;
3095  o << ")";
3096  break;
3097  }
3098 
3099  //------------------------------------------------------------------------
3100  // kXR_readv
3101  //------------------------------------------------------------------------
3102  case kXR_readv:
3103  {
3104  unsigned char *fhandle = 0;
3105  o << "kXR_readv (";
3106 
3107  o << "handle: ";
3108  readahead_list *dataChunk = (readahead_list*)(msg + 24 );
3109  fhandle = dataChunk[0].fhandle;
3110  if( fhandle )
3111  o << FileHandleToStr( fhandle );
3112  else
3113  o << "unknown";
3114  o << ", ";
3115  o << std::setbase(10);
3116  o << "chunks: [";
3117  uint64_t size = 0;
3118  for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3119  {
3120  size += dataChunk[i].rlen;
3121  o << "(offset: " << dataChunk[i].offset;
3122  o << ", size: " << dataChunk[i].rlen << "); ";
3123  }
3124  o << "], ";
3125  o << "total size: " << size << ")";
3126  break;
3127  }
3128 
3129  //------------------------------------------------------------------------
3130  // kXR_writev
3131  //------------------------------------------------------------------------
3132  case kXR_writev:
3133  {
3134  unsigned char *fhandle = 0;
3135  o << "kXR_writev (";
3136 
3137  XrdProto::write_list *wrtList =
3138  reinterpret_cast<XrdProto::write_list*>( msg + 24 );
3139  uint64_t size = 0;
3140  uint32_t numChunks = 0;
3141  for( size_t i = 0; i < req->dlen/sizeof(XrdProto::write_list); ++i )
3142  {
3143  fhandle = wrtList[i].fhandle;
3144  size += wrtList[i].wlen;
3145  ++numChunks;
3146  }
3147  o << "handle: ";
3148  if( fhandle )
3149  o << FileHandleToStr( fhandle );
3150  else
3151  o << "unknown";
3152  o << ", ";
3153  o << std::setbase(10);
3154  o << "chunks: " << numChunks << ", ";
3155  o << "total size: " << size << ")";
3156  break;
3157  }
3158 
3159  //------------------------------------------------------------------------
3160  // kXR_locate
3161  //------------------------------------------------------------------------
3162  case kXR_locate:
3163  {
3165  char *fn = GetDataAsString( msg );;
3166  o << "kXR_locate (";
3167  o << "path: " << fn << ", ";
3168  delete [] fn;
3169  o << "flags: ";
3170  if( sreq->options == 0 )
3171  o << "none";
3172  else
3173  {
3174  if( sreq->options & kXR_refresh )
3175  o << "kXR_refresh ";
3176  if( sreq->options & kXR_prefname )
3177  o << "kXR_prefname ";
3178  if( sreq->options & kXR_nowait )
3179  o << "kXR_nowait ";
3180  if( sreq->options & kXR_force )
3181  o << "kXR_force ";
3182  if( sreq->options & kXR_compress )
3183  o << "kXR_compress ";
3184  }
3185  o << ")";
3186  break;
3187  }
3188 
3189  //------------------------------------------------------------------------
3190  // kXR_mv
3191  //------------------------------------------------------------------------
3192  case kXR_mv:
3193  {
3194  ClientMvRequest *sreq = (ClientMvRequest *)msg;
3195  o << "kXR_mv (";
3196  o << "source: ";
3197  o.write( msg + sizeof( ClientMvRequest ), sreq->arg1len );
3198  o << ", ";
3199  o << "destination: ";
3200  o.write( msg + sizeof( ClientMvRequest ) + sreq->arg1len + 1, sreq->dlen - sreq->arg1len - 1 );
3201  o << ")";
3202  break;
3203  }
3204 
3205  //------------------------------------------------------------------------
3206  // kXR_query
3207  //------------------------------------------------------------------------
3208  case kXR_query:
3209  {
3210  ClientQueryRequest *sreq = (ClientQueryRequest *)msg;
3211  o << "kXR_query (";
3212  o << "code: ";
3213  switch( sreq->infotype )
3214  {
3215  case kXR_Qconfig: o << "kXR_Qconfig"; break;
3216  case kXR_Qckscan: o << "kXR_Qckscan"; break;
3217  case kXR_Qcksum: o << "kXR_Qcksum"; break;
3218  case kXR_Qopaque: o << "kXR_Qopaque"; break;
3219  case kXR_Qopaquf: o << "kXR_Qopaquf"; break;
3220  case kXR_Qopaqug: o << "kXR_Qopaqug"; break;
3221  case kXR_QPrep: o << "kXR_QPrep"; break;
3222  case kXR_Qspace: o << "kXR_Qspace"; break;
3223  case kXR_QStats: o << "kXR_QStats"; break;
3224  case kXR_Qvisa: o << "kXR_Qvisa"; break;
3225  case kXR_Qxattr: o << "kXR_Qxattr"; break;
3226  default: o << sreq->infotype; break;
3227  }
3228  o << ", ";
3229 
3230  if( sreq->infotype == kXR_Qopaqug || sreq->infotype == kXR_Qvisa )
3231  {
3232  o << "handle: " << FileHandleToStr( sreq->fhandle );
3233  o << ", ";
3234  }
3235 
3236  o << "arg length: " << sreq->dlen << ")";
3237  break;
3238  }
3239 
3240  //------------------------------------------------------------------------
3241  // kXR_rm
3242  //------------------------------------------------------------------------
3243  case kXR_rm:
3244  {
3245  o << "kXR_rm (";
3246  char *fn = GetDataAsString( msg );;
3247  o << "path: " << fn << ")";
3248  delete [] fn;
3249  break;
3250  }
3251 
3252  //------------------------------------------------------------------------
3253  // kXR_mkdir
3254  //------------------------------------------------------------------------
3255  case kXR_mkdir:
3256  {
3257  ClientMkdirRequest *sreq = (ClientMkdirRequest *)msg;
3258  o << "kXR_mkdir (";
3259  char *fn = GetDataAsString( msg );
3260  o << "path: " << fn << ", ";
3261  delete [] fn;
3262  o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3263  o << std::setbase(10);
3264  o << "flags: ";
3265  if( sreq->options[0] == 0 )
3266  o << "none";
3267  else
3268  {
3269  if( sreq->options[0] & kXR_mkdirpath )
3270  o << "kXR_mkdirpath";
3271  }
3272  o << ")";
3273  break;
3274  }
3275 
3276  //------------------------------------------------------------------------
3277  // kXR_rmdir
3278  //------------------------------------------------------------------------
3279  case kXR_rmdir:
3280  {
3281  o << "kXR_rmdir (";
3282  char *fn = GetDataAsString( msg );
3283  o << "path: " << fn << ")";
3284  delete [] fn;
3285  break;
3286  }
3287 
3288  //------------------------------------------------------------------------
3289  // kXR_chmod
3290  //------------------------------------------------------------------------
3291  case kXR_chmod:
3292  {
3293  ClientChmodRequest *sreq = (ClientChmodRequest *)msg;
3294  o << "kXR_chmod (";
3295  char *fn = GetDataAsString( msg );
3296  o << "path: " << fn << ", ";
3297  delete [] fn;
3298  o << "mode: 0" << std::setbase(8) << sreq->mode << ")";
3299  break;
3300  }
3301 
3302  //------------------------------------------------------------------------
3303  // kXR_ping
3304  //------------------------------------------------------------------------
3305  case kXR_ping:
3306  {
3307  o << "kXR_ping ()";
3308  break;
3309  }
3310 
3311  //------------------------------------------------------------------------
3312  // kXR_protocol
3313  //------------------------------------------------------------------------
3314  case kXR_protocol:
3315  {
3317  o << "kXR_protocol (";
3318  o << "clientpv: 0x" << std::setbase(16) << sreq->clientpv << ")";
3319  break;
3320  }
3321 
3322  //------------------------------------------------------------------------
3323  // kXR_dirlist
3324  //------------------------------------------------------------------------
3325  case kXR_dirlist:
3326  {
3327  o << "kXR_dirlist (";
3328  char *fn = GetDataAsString( msg );;
3329  o << "path: " << fn << ")";
3330  delete [] fn;
3331  break;
3332  }
3333 
3334  //------------------------------------------------------------------------
3335  // kXR_set
3336  //------------------------------------------------------------------------
3337  case kXR_set:
3338  {
3339  o << "kXR_set (";
3340  char *fn = GetDataAsString( msg );;
3341  o << "data: " << fn << ")";
3342  delete [] fn;
3343  break;
3344  }
3345 
3346  //------------------------------------------------------------------------
3347  // kXR_prepare
3348  //------------------------------------------------------------------------
3349  case kXR_prepare:
3350  {
3352  o << "kXR_prepare (";
3353  o << "flags: ";
3354 
3355  if( sreq->options == 0 )
3356  o << "none";
3357  else
3358  {
3359  if( sreq->options & kXR_stage )
3360  o << "kXR_stage ";
3361  if( sreq->options & kXR_wmode )
3362  o << "kXR_wmode ";
3363  if( sreq->options & kXR_coloc )
3364  o << "kXR_coloc ";
3365  if( sreq->options & kXR_fresh )
3366  o << "kXR_fresh ";
3367  }
3368 
3369  o << ", priority: " << (int) sreq->prty << ", ";
3370 
3371  char *fn = GetDataAsString( msg );
3372  char *cursor;
3373  for( cursor = fn; *cursor; ++cursor )
3374  if( *cursor == '\n' ) *cursor = ' ';
3375 
3376  o << "paths: " << fn << ")";
3377  delete [] fn;
3378  break;
3379  }
3380 
3381  case kXR_chkpoint:
3382  {
3384  o << "kXR_chkpoint (";
3385  o << "opcode: ";
3386  if( sreq->opcode == kXR_ckpBegin ) o << "kXR_ckpBegin)";
3387  else if( sreq->opcode == kXR_ckpCommit ) o << "kXR_ckpCommit)";
3388  else if( sreq->opcode == kXR_ckpQuery ) o << "kXR_ckpQuery)";
3389  else if( sreq->opcode == kXR_ckpRollback ) o << "kXR_ckpRollback)";
3390  else if( sreq->opcode == kXR_ckpXeq )
3391  {
3392  o << "kXR_ckpXeq) ";
3393  // In this case our request body will be one of kXR_pgwrite,
3394  // kXR_truncate, kXR_write, or kXR_writev request.
3395  GenerateDescription( msg + sizeof( ClientChkPointRequest ), o );
3396  }
3397 
3398  break;
3399  }
3400 
3401  //------------------------------------------------------------------------
3402  // Default
3403  //------------------------------------------------------------------------
3404  default:
3405  {
3406  o << "kXR_unknown (length: " << req->dlen << ")";
3407  break;
3408  }
3409  };
3410  }
static const int kXR_ckpRollback
Definition: XProtocol.hh:215
kXR_int16 arg1len
Definition: XProtocol.hh:430
kXR_char fhandle[4]
Definition: XProtocol.hh:531
kXR_char fhandle[4]
Definition: XProtocol.hh:782
kXR_char fhandle[4]
Definition: XProtocol.hh:807
kXR_char fhandle[4]
Definition: XProtocol.hh:771
kXR_int32 dlen
Definition: XProtocol.hh:431
kXR_int64 offset
Definition: XProtocol.hh:646
kXR_unt16 options
Definition: XProtocol.hh:481
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
@ kXR_compress
Definition: XProtocol.hh:452
@ kXR_async
Definition: XProtocol.hh:458
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_prefname
Definition: XProtocol.hh:461
@ kXR_nowait
Definition: XProtocol.hh:467
@ kXR_open_read
Definition: XProtocol.hh:456
@ kXR_open_updt
Definition: XProtocol.hh:457
@ kXR_mkpath
Definition: XProtocol.hh:460
@ kXR_seqio
Definition: XProtocol.hh:468
@ kXR_replica
Definition: XProtocol.hh:465
@ kXR_posc
Definition: XProtocol.hh:466
@ kXR_refresh
Definition: XProtocol.hh:459
@ kXR_new
Definition: XProtocol.hh:455
@ kXR_force
Definition: XProtocol.hh:454
@ kXR_open_apnd
Definition: XProtocol.hh:462
@ kXR_retstat
Definition: XProtocol.hh:463
kXR_char fhandle[4]
Definition: XProtocol.hh:509
kXR_unt16 infotype
Definition: XProtocol.hh:631
kXR_char fhandle[4]
Definition: XProtocol.hh:645
kXR_char fhandle[4]
Definition: XProtocol.hh:659
kXR_char fhandle[4]
Definition: XProtocol.hh:229
kXR_unt16 requestid
Definition: XProtocol.hh:157
kXR_char fhandle[4]
Definition: XProtocol.hh:633
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_mkdir
Definition: XProtocol.hh:120
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_chmod
Definition: XProtocol.hh:114
@ kXR_dirlist
Definition: XProtocol.hh:116
@ kXR_rm
Definition: XProtocol.hh:126
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_set
Definition: XProtocol.hh:130
@ kXR_rmdir
Definition: XProtocol.hh:127
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_protocol
Definition: XProtocol.hh:118
@ kXR_mv
Definition: XProtocol.hh:121
@ kXR_ping
Definition: XProtocol.hh:123
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_locate
Definition: XProtocol.hh:139
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_pgwrite
Definition: XProtocol.hh:138
@ kXR_prepare
Definition: XProtocol.hh:133
kXR_int32 rlen
Definition: XProtocol.hh:660
kXR_char fhandle[4]
Definition: XProtocol.hh:794
kXR_unt16 mode
Definition: XProtocol.hh:480
kXR_char options[1]
Definition: XProtocol.hh:416
static const int kXR_ckpCommit
Definition: XProtocol.hh:213
kXR_int64 offset
Definition: XProtocol.hh:661
@ kXR_vfs
Definition: XProtocol.hh:763
@ kXR_mkdirpath
Definition: XProtocol.hh:410
@ kXR_wmode
Definition: XProtocol.hh:591
@ kXR_fresh
Definition: XProtocol.hh:593
@ kXR_coloc
Definition: XProtocol.hh:592
@ kXR_stage
Definition: XProtocol.hh:590
static const int kXR_ckpQuery
Definition: XProtocol.hh:214
kXR_int64 offset
Definition: XProtocol.hh:808
kXR_int32 dlen
Definition: XProtocol.hh:772
kXR_char options
Definition: XProtocol.hh:769
kXR_int32 rlen
Definition: XProtocol.hh:647
@ kXR_QPrep
Definition: XProtocol.hh:616
@ kXR_Qopaqug
Definition: XProtocol.hh:625
@ kXR_Qconfig
Definition: XProtocol.hh:621
@ kXR_Qopaquf
Definition: XProtocol.hh:624
@ kXR_Qckscan
Definition: XProtocol.hh:620
@ kXR_Qxattr
Definition: XProtocol.hh:618
@ kXR_Qspace
Definition: XProtocol.hh:619
@ kXR_Qvisa
Definition: XProtocol.hh:622
@ kXR_QStats
Definition: XProtocol.hh:615
@ kXR_Qcksum
Definition: XProtocol.hh:617
@ kXR_Qopaque
Definition: XProtocol.hh:623
kXR_int32 dlen
Definition: XProtocol.hh:159
static const int kXR_ckpBegin
Definition: XProtocol.hh:212
static Log * GetLog()
Get default log.
@ ErrorMsg
report errors
Definition: XrdClLog.hh:109
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
XrdSysError Log
Definition: XrdConfig.cc:113
kXR_char fhandle[4]
Definition: XProtocol.hh:832

References ClientMvRequest::arg1len, ClientProtocolRequest::clientpv, ClientRequestHdr::dlen, ClientMvRequest::dlen, ClientPgWriteRequest::dlen, ClientQueryRequest::dlen, ClientStatRequest::dlen, ClientTruncateRequest::dlen, ClientWriteRequest::dlen, XrdCl::Log::ErrorMsg, ClientCloseRequest::fhandle, ClientPgReadRequest::fhandle, ClientPgWriteRequest::fhandle, ClientQueryRequest::fhandle, ClientReadRequest::fhandle, readahead_list::fhandle, ClientStatRequest::fhandle, ClientSyncRequest::fhandle, ClientTruncateRequest::fhandle, ClientWriteRequest::fhandle, XrdProto::write_list::fhandle, XrdCl::Log::GetLevel(), XrdCl::DefaultEnv::GetLog(), ClientQueryRequest::infotype, kXR_async, kXR_chkpoint, kXR_chmod, kXR_ckpBegin, kXR_ckpCommit, kXR_ckpQuery, kXR_ckpRollback, kXR_ckpXeq, kXR_close, kXR_coloc, kXR_compress, kXR_delete, kXR_dirlist, kXR_force, kXR_fresh, kXR_locate, kXR_mkdir, kXR_mkdirpath, kXR_mkpath, kXR_mv, kXR_new, kXR_nowait, kXR_open, kXR_open_apnd, kXR_open_read, kXR_open_updt, kXR_pgread, kXR_pgwrite, kXR_ping, kXR_posc, kXR_prefname, kXR_prepare, kXR_protocol, kXR_Qckscan, kXR_Qcksum, kXR_Qconfig, kXR_Qopaque, kXR_Qopaquf, kXR_Qopaqug, kXR_QPrep, kXR_Qspace, kXR_QStats, kXR_query, kXR_Qvisa, kXR_Qxattr, kXR_read, kXR_readv, kXR_refresh, kXR_replica, kXR_retstat, kXR_rm, kXR_rmdir, kXR_seqio, kXR_set, kXR_stage, kXR_stat, kXR_sync, kXR_truncate, kXR_vfs, kXR_wmode, kXR_write, kXR_writev, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, readahead_list::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, ClientChkPointRequest::opcode, ClientLocateRequest::options, ClientMkdirRequest::options, ClientOpenRequest::options, ClientPrepareRequest::options, ClientStatRequest::options, ClientPrepareRequest::prty, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, and XrdProto::write_list::wlen.

Referenced by SetDescription().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetBindPreference()

URL XrdCl::XRootDTransport::GetBindPreference ( const URL url,
AnyObject channelData 
)
virtual

Get bind preference for the next data stream.

Implements XrdCl::TransportHandler.

Definition at line 1841 of file XrdClXRootDTransport.cc.

1843  {
1844  XRootDChannelInfo *info = 0;
1845  channelData.Get( info );
1846  if( !bool( info->bindSelector ) )
1847  return url;
1848 
1849  return URL( info->bindSelector->Get() );
1850  }

References XrdCl::XRootDChannelInfo::bindSelector, and XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ GetBody()

XRootDStatus XrdCl::XRootDTransport::GetBody ( Message message,
Socket socket 
)
virtual

Read the message body from the socket, the socket is non-blocking, the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 347 of file XrdClXRootDTransport.cc.

348  {
349  //--------------------------------------------------------------------------
350  // Retrieve the body
351  //--------------------------------------------------------------------------
352  size_t leftToBeRead = 0;
353  uint32_t bodySize = 0;
354  ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
355  bodySize = rsphdr->dlen;
356 
357  if( message.GetSize() < bodySize + 8 )
358  message.ReAllocate( bodySize + 8 );
359 
360  leftToBeRead = bodySize-(message.GetCursor()-8);
361  while( leftToBeRead )
362  {
363  int bytesRead = 0;
364  XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
365 
366  if( !status.IsOK() || status.code == suRetry )
367  return status;
368 
369  leftToBeRead -= bytesRead;
370  message.AdvanceCursor( bytesRead );
371  }
372 
373  return XRootDStatus( stOK, suDone );
374  }
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t suDone
Definition: XrdClStatus.hh:38

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Status::code, ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), XrdCl::stOK, XrdCl::suDone, and XrdCl::suRetry.

+ Here is the call graph for this function:

◆ GetHeader()

XRootDStatus XrdCl::XRootDTransport::GetHeader ( Message message,
Socket socket 
)
virtual

Read a message header from the socket, the socket is non-blocking, so if there is not enough data the function should return suRetry in which case it will be called again when more data arrives, with the data previously read stored in the message buffer

Parameters
messagethe message buffer
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 307 of file XrdClXRootDTransport.cc.

308  {
309  //--------------------------------------------------------------------------
310  // A new message - allocate the space needed for the header
311  //--------------------------------------------------------------------------
312  if( message.GetCursor() == 0 && message.GetSize() < 8 )
313  message.Allocate( 8 );
314 
315  //--------------------------------------------------------------------------
316  // Read the message header
317  //--------------------------------------------------------------------------
318  if( message.GetCursor() < 8 )
319  {
320  size_t leftToBeRead = 8 - message.GetCursor();
321  while( leftToBeRead )
322  {
323  int bytesRead = 0;
324  XRootDStatus status = socket->Read( message.GetBufferAtCursor(),
325  leftToBeRead, bytesRead );
326  if( !status.IsOK() || status.code == suRetry )
327  return status;
328 
329  leftToBeRead -= bytesRead;
330  message.AdvanceCursor( bytesRead );
331  }
332  UnMarshallHeader( message );
333 
334  uint32_t bodySize = *(uint32_t*)(message.GetBuffer(4));
335  Log *log = DefaultEnv::GetLog();
336  log->Dump( XRootDTransportMsg, "[msg: %p] Expecting %d bytes of message "
337  "body", &message, bodySize );
338 
339  return XRootDStatus( stOK, suDone );
340  }
341  return XRootDStatus( stError, errInternal );
342  }
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
const uint64_t XRootDTransportMsg
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Buffer::Allocate(), XrdCl::Status::code, XrdCl::Log::Dump(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarshallHeader(), and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ GetMore()

XRootDStatus XrdCl::XRootDTransport::GetMore ( Message message,
Socket socket 
)
virtual

Read more of the message body from the socket, the socket is non-blocking the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 379 of file XrdClXRootDTransport.cc.

380  {
381  ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
382  if( rsphdr->status != kXR_status )
383  return XRootDStatus( stError, errInvalidOp );
384 
385  //--------------------------------------------------------------------------
386  // In case of non kXR_status responses we read all the response, including
387  // data. For kXR_status responses we first read only the remainder of the
388  // header. The header must then be unmarshalled, and then a second call to
389  // GetMore (repeated for suRetry as needed) will read the data.
390  //--------------------------------------------------------------------------
391 
392  uint32_t bodySize = rsphdr->dlen;
393  if( bodySize+8 < sizeof( ServerResponseStatus ) )
394  return XRootDStatus( stError, errInvalidMessage, 0,
395  "kXR_status: invalid message size." );
396 
397  ServerResponseStatus *rspst = (ServerResponseStatus*)message.GetBuffer();
398  bodySize += rspst->bdy.dlen;
399 
400  if( message.GetSize() < bodySize + 8 )
401  message.ReAllocate( bodySize + 8 );
402 
403  size_t leftToBeRead = bodySize-(message.GetCursor()-8);
404  while( leftToBeRead )
405  {
406  int bytesRead = 0;
407  XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
408 
409  if( !status.IsOK() || status.code == suRetry )
410  return status;
411 
412  leftToBeRead -= bytesRead;
413  message.AdvanceCursor( bytesRead );
414  }
415 
416  // Unmarchal to message body
417  Log *log = DefaultEnv::GetLog();
418  XRootDStatus st = XRootDTransport::UnMarchalStatusMore( message );
419  if( !st.IsOK() && st.code == errDataError )
420  {
421  log->Error( XRootDTransportMsg, "[msg: %p] %s", &message,
422  st.GetErrorMessage().c_str() );
423  return st;
424  }
425 
426  if( !st.IsOK() )
427  {
428  log->Error( XRootDTransportMsg, "[msg: %p] Failed to unmarshall status body.",
429  &message );
430  return st;
431  }
432 
433  return XRootDStatus( stOK, suDone );
434  }
@ kXR_status
Definition: XProtocol.hh:907
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1262
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85

References XrdCl::Buffer::AdvanceCursor(), ServerResponseStatus::bdy, XrdCl::Status::code, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errInvalidOp, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), kXR_status, XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarchalStatusMore(), and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ GetSignature() [1/2]

Status XrdCl::XRootDTransport::GetSignature ( Message toSign,
Message *&  sign,
AnyObject channelData 
)
virtual

Get signature for given message.

Implements XrdCl::TransportHandler.

Definition at line 1703 of file XrdClXRootDTransport.cc.

1704  {
1705  XRootDChannelInfo *info = 0;
1706  channelData.Get( info );
1707  return GetSignature( toSign, sign, info );
1708  }
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.

References XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ GetSignature() [2/2]

Status XrdCl::XRootDTransport::GetSignature ( Message toSign,
Message *&  sign,
XRootDChannelInfo info 
)
virtual

Get signature for given message.

Definition at line 1713 of file XrdClXRootDTransport.cc.

1716  {
1717  XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
1718  if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
1719 
1720  ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
1721  if( !info ) return Status( stError, errInternal );
1722  if( info->protection )
1723  {
1724  SecurityRequest *newreq = 0;
1725  // check if we have to secure the request in the first place
1726  if( !( NEED2SECURE ( info->protection )( *thereq ) ) ) return Status();
1727  // secure (sign/encrypt) the request
1728  int rc = info->protection->Secure( newreq, *thereq, 0 );
1729  // there was an error
1730  if( rc < 0 )
1731  return Status( stError, errInternal, -rc );
1732 
1733  sign = new Message();
1734  sign->Grab( reinterpret_cast<char*>( newreq ), rc );
1735  }
1736 
1737  return Status();
1738  }
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.

References XrdCl::errInternal, XrdCl::errInvalidOp, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::Grab(), XrdCl::PluginUnloadHandler::lock, NEED2SECURE, XrdCl::XRootDChannelInfo::protection, XrdSecProtect::Secure(), XrdCl::stError, and XrdCl::PluginUnloadHandler::unloaded.

+ Here is the call graph for this function:

◆ HandShake()

XRootDStatus XrdCl::XRootDTransport::HandShake ( HandShakeData handShakeData,
AnyObject channelData 
)
virtual

HandShake.

Implements XrdCl::TransportHandler.

Definition at line 467 of file XrdClXRootDTransport.cc.

469  {
470  XRootDChannelInfo *info = 0;
471  channelData.Get( info );
472  XrdSysMutexHelper scopedLock( info->mutex );
473 
474  if( info->stream.size() <= handShakeData->subStreamId )
475  {
476  Log *log = DefaultEnv::GetLog();
477  log->Error( XRootDTransportMsg,
478  "[%s] Internal error: not enough substreams",
479  handShakeData->streamName.c_str() );
480  return XRootDStatus( stFatal, errInternal );
481  }
482 
483  if( handShakeData->subStreamId == 0 )
484  {
485  info->streamName = handShakeData->streamName;
486  return HandShakeMain( handShakeData, channelData );
487  }
488  return HandShakeParallel( handShakeData, channelData );
489  }
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33

References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::stFatal, XrdCl::XRootDChannelInfo::stream, XrdCl::HandShakeData::streamName, XrdCl::XRootDChannelInfo::streamName, XrdCl::HandShakeData::subStreamId, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ HandShakeDone()

bool XrdCl::XRootDTransport::HandShakeDone ( HandShakeData handShakeData,
AnyObject channelData 
)
virtual

Implements XrdCl::TransportHandler.

Definition at line 727 of file XrdClXRootDTransport.cc.

729  {
730  XRootDChannelInfo *info = 0;
731  channelData.Get( info );
732  XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
733  return ( sInfo.status == XRootDStreamInfo::Connected );
734  }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::AnyObject::Get(), XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::HandShakeData::subStreamId.

+ Here is the call graph for this function:

◆ InitializeChannel()

void XrdCl::XRootDTransport::InitializeChannel ( const URL url,
AnyObject channelData 
)
virtual

Initialize channel.

Implements XrdCl::TransportHandler.

Definition at line 439 of file XrdClXRootDTransport.cc.

441  {
442  XRootDChannelInfo *info = new XRootDChannelInfo( url );
443  XrdSysMutexHelper scopedLock( info->mutex );
444  channelData.Set( info );
445 
446  Env *env = DefaultEnv::GetEnv();
447  int streams = DefaultSubStreamsPerChannel;
448  env->GetInt( "SubStreamsPerChannel", streams );
449  if( streams < 1 ) streams = 1;
450  info->stream.resize( streams );
451  info->strmSelector.reset( new StreamSelector( streams ) );
452  info->encrypted = url.IsSecure();
453  info->istpc = url.IsTPC();
454  info->logintoken = url.GetLoginToken();
455  }
static Env * GetEnv()
Get default client environment.
const int DefaultSubStreamsPerChannel

References XrdCl::DefaultSubStreamsPerChannel, XrdCl::XRootDChannelInfo::encrypted, XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::URL::GetLoginToken(), XrdCl::URL::IsSecure(), XrdCl::URL::IsTPC(), XrdCl::XRootDChannelInfo::istpc, XrdCl::XRootDChannelInfo::logintoken, XrdCl::XRootDChannelInfo::mutex, XrdCl::AnyObject::Set(), XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::strmSelector.

+ Here is the call graph for this function:

◆ IsStreamBroken()

Status XrdCl::XRootDTransport::IsStreamBroken ( time_t  inactiveTime,
AnyObject channelData 
)
virtual

Check the stream is broken - ie. TCP connection got broken and went undetected by the TCP stack

Implements XrdCl::TransportHandler.

Definition at line 785 of file XrdClXRootDTransport.cc.

787  {
788  XRootDChannelInfo *info = 0;
789  channelData.Get( info );
790  Env *env = DefaultEnv::GetEnv();
791  Log *log = DefaultEnv::GetLog();
792 
793  int streamTimeout = DefaultStreamTimeout;
794  env->GetInt( "StreamTimeout", streamTimeout );
795 
796  XrdSysMutexHelper scopedLock( info->mutex );
797 
798  const time_t now = time(0);
799  const bool anySID =
800  info->sidManager->IsAnySIDOldAs( now - streamTimeout );
801 
802  log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
803  "stream timeout: %d, any SID: %d, wait barrier: %s",
804  info->streamName.c_str(), (long long) inactiveTime, streamTimeout,
805  anySID, Utils::TimeToString(info->waitBarrier).c_str() );
806 
807  if( inactiveTime < streamTimeout )
808  return Status();
809 
810  if( now < info->waitBarrier )
811  return Status();
812 
813  if( !anySID )
814  return Status();
815 
816  return Status( stError, errSocketTimeout );
817  }
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
Definition: XrdClUtils.cc:256
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
const int DefaultStreamTimeout

References XrdCl::DefaultStreamTimeout, XrdCl::Log::Dump(), XrdCl::errSocketTimeout, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::sidManager, XrdCl::stError, XrdCl::XRootDChannelInfo::streamName, XrdCl::Utils::TimeToString(), XrdCl::XRootDChannelInfo::waitBarrier, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ IsStreamTTLElapsed()

bool XrdCl::XRootDTransport::IsStreamTTLElapsed ( time_t  time,
AnyObject channelData 
)
virtual

Check if the stream should be disconnected.

Implements XrdCl::TransportHandler.

Definition at line 739 of file XrdClXRootDTransport.cc.

741  {
742  XRootDChannelInfo *info = 0;
743  channelData.Get( info );
744  Env *env = DefaultEnv::GetEnv();
745  Log *log = DefaultEnv::GetLog();
746 
747  //--------------------------------------------------------------------------
748  // Check the TTL settings for the current server
749  //--------------------------------------------------------------------------
750  int ttl;
751  if( info->serverFlags & kXR_isServer )
752  {
753  ttl = DefaultDataServerTTL;
754  env->GetInt( "DataServerTTL", ttl );
755  }
756  else
757  {
759  env->GetInt( "LoadBalancerTTL", ttl );
760  }
761 
762  //--------------------------------------------------------------------------
763  // See whether we can give a go-ahead for the disconnection
764  //--------------------------------------------------------------------------
765  XrdSysMutexHelper scopedLock( info->mutex );
766  uint16_t allocatedSIDs = info->sidManager->GetNumberOfAllocatedSIDs();
767  log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
768  "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
769  info->streamName.c_str(), (long long) inactiveTime, ttl, allocatedSIDs,
770  info->openFiles, info->finstcnt.load( std::memory_order_relaxed ) );
771 
772  if( info->openFiles != 0 && info->finstcnt.load( std::memory_order_relaxed ) != 0 )
773  return false;
774 
775  if( !allocatedSIDs && inactiveTime > ttl )
776  return true;
777 
778  return false;
779  }
#define kXR_isServer
Definition: XProtocol.hh:1157
const int DefaultLoadBalancerTTL
const int DefaultDataServerTTL

References XrdCl::DefaultDataServerTTL, XrdCl::DefaultLoadBalancerTTL, XrdCl::Log::Dump(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), kXR_isServer, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDChannelInfo::streamName, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ LogErrorResponse()

void XrdCl::XRootDTransport::LogErrorResponse ( const Message msg)
static

Log server error response.

Definition at line 1454 of file XrdClXRootDTransport.cc.

1455  {
1456  Log *log = DefaultEnv::GetLog();
1457  ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
1458  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
1459  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
1460  log->Error( XRootDTransportMsg, "Server responded with an error [%d]: %s",
1461  rsp->body.error.errnum, errmsg );
1462  delete [] errmsg;
1463  }
union ServerResponse::@0 body
ServerResponseHeader hdr
Definition: XProtocol.hh:1288

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ MarshallRequest() [1/2]

XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( char *  msg)
static

Marshal the outgoing message.

Definition at line 1050 of file XrdClXRootDTransport.cc.

1051  {
1052  ClientRequest *req = (ClientRequest*)msg;
1053  switch( req->header.requestid )
1054  {
1055  //------------------------------------------------------------------------
1056  // kXR_protocol
1057  //------------------------------------------------------------------------
1058  case kXR_protocol:
1059  req->protocol.clientpv = htonl( req->protocol.clientpv );
1060  break;
1061 
1062  //------------------------------------------------------------------------
1063  // kXR_login
1064  //------------------------------------------------------------------------
1065  case kXR_login:
1066  req->login.pid = htonl( req->login.pid );
1067  break;
1068 
1069  //------------------------------------------------------------------------
1070  // kXR_locate
1071  //------------------------------------------------------------------------
1072  case kXR_locate:
1073  req->locate.options = htons( req->locate.options );
1074  break;
1075 
1076  //------------------------------------------------------------------------
1077  // kXR_query
1078  //------------------------------------------------------------------------
1079  case kXR_query:
1080  req->query.infotype = htons( req->query.infotype );
1081  break;
1082 
1083  //------------------------------------------------------------------------
1084  // kXR_truncate
1085  //------------------------------------------------------------------------
1086  case kXR_truncate:
1087  req->truncate.offset = htonll( req->truncate.offset );
1088  break;
1089 
1090  //------------------------------------------------------------------------
1091  // kXR_mkdir
1092  //------------------------------------------------------------------------
1093  case kXR_mkdir:
1094  req->mkdir.mode = htons( req->mkdir.mode );
1095  break;
1096 
1097  //------------------------------------------------------------------------
1098  // kXR_chmod
1099  //------------------------------------------------------------------------
1100  case kXR_chmod:
1101  req->chmod.mode = htons( req->chmod.mode );
1102  break;
1103 
1104  //------------------------------------------------------------------------
1105  // kXR_open
1106  //------------------------------------------------------------------------
1107  case kXR_open:
1108  req->open.mode = htons( req->open.mode );
1109  req->open.options = htons( req->open.options );
1110  break;
1111 
1112  //------------------------------------------------------------------------
1113  // kXR_read
1114  //------------------------------------------------------------------------
1115  case kXR_read:
1116  req->read.offset = htonll( req->read.offset );
1117  req->read.rlen = htonl( req->read.rlen );
1118  break;
1119 
1120  //------------------------------------------------------------------------
1121  // kXR_write
1122  //------------------------------------------------------------------------
1123  case kXR_write:
1124  req->write.offset = htonll( req->write.offset );
1125  break;
1126 
1127  //------------------------------------------------------------------------
1128  // kXR_mv
1129  //------------------------------------------------------------------------
1130  case kXR_mv:
1131  req->mv.arg1len = htons( req->mv.arg1len );
1132  break;
1133 
1134  //------------------------------------------------------------------------
1135  // kXR_readv
1136  //------------------------------------------------------------------------
1137  case kXR_readv:
1138  {
1139  uint16_t numChunks = (req->readv.dlen)/16;
1140  readahead_list *dataChunk = (readahead_list*)( msg + 24 );
1141  for( size_t i = 0; i < numChunks; ++i )
1142  {
1143  dataChunk[i].rlen = htonl( dataChunk[i].rlen );
1144  dataChunk[i].offset = htonll( dataChunk[i].offset );
1145  }
1146  break;
1147  }
1148 
1149  //------------------------------------------------------------------------
1150  // kXR_writev
1151  //------------------------------------------------------------------------
1152  case kXR_writev:
1153  {
1154  uint16_t numChunks = (req->writev.dlen)/16;
1155  XrdProto::write_list *wrtList =
1156  reinterpret_cast<XrdProto::write_list*>( msg + 24 );
1157  for( size_t i = 0; i < numChunks; ++i )
1158  {
1159  wrtList[i].wlen = htonl( wrtList[i].wlen );
1160  wrtList[i].offset = htonll( wrtList[i].offset );
1161  }
1162 
1163  break;
1164  }
1165 
1166  case kXR_pgread:
1167  {
1168  req->pgread.offset = htonll( req->pgread.offset );
1169  req->pgread.rlen = htonl( req->pgread.rlen );
1170  break;
1171  }
1172 
1173  case kXR_pgwrite:
1174  {
1175  req->pgwrite.offset = htonll( req->pgwrite.offset );
1176  break;
1177  }
1178 
1179  //------------------------------------------------------------------------
1180  // kXR_prepare
1181  //------------------------------------------------------------------------
1182  case kXR_prepare:
1183  {
1184  req->prepare.optionX = htons( req->prepare.optionX );
1185  req->prepare.port = htons( req->prepare.port );
1186  break;
1187  }
1188 
1189  case kXR_chkpoint:
1190  {
1191  if( req->chkpoint.opcode == kXR_ckpXeq )
1192  MarshallRequest( msg + 24 );
1193  break;
1194  }
1195  };
1196 
1197  req->header.requestid = htons( req->header.requestid );
1198  req->header.dlen = htonl( req->header.dlen );
1199  return XRootDStatus();
1200  }
struct ClientTruncateRequest truncate
Definition: XProtocol.hh:875
struct ClientPgReadRequest pgread
Definition: XProtocol.hh:861
struct ClientMkdirRequest mkdir
Definition: XProtocol.hh:858
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:862
struct ClientReadVRequest readv
Definition: XProtocol.hh:868
struct ClientOpenRequest open
Definition: XProtocol.hh:860
struct ClientRequestHdr header
Definition: XProtocol.hh:846
struct ClientWriteVRequest writev
Definition: XProtocol.hh:877
struct ClientLoginRequest login
Definition: XProtocol.hh:857
@ kXR_login
Definition: XProtocol.hh:119
struct ClientChmodRequest chmod
Definition: XProtocol.hh:850
struct ClientQueryRequest query
Definition: XProtocol.hh:866
struct ClientReadRequest read
Definition: XProtocol.hh:867
struct ClientMvRequest mv
Definition: XProtocol.hh:859
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:849
struct ClientPrepareRequest prepare
Definition: XProtocol.hh:864
struct ClientWriteRequest write
Definition: XProtocol.hh:876
struct ClientProtocolRequest protocol
Definition: XProtocol.hh:865
struct ClientLocateRequest locate
Definition: XProtocol.hh:856
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.

References ClientMvRequest::arg1len, ClientRequest::chkpoint, ClientRequest::chmod, ClientProtocolRequest::clientpv, ClientRequestHdr::dlen, ClientReadVRequest::dlen, ClientWriteVRequest::dlen, ClientRequest::header, ClientQueryRequest::infotype, kXR_chkpoint, kXR_chmod, kXR_ckpXeq, kXR_locate, kXR_login, kXR_mkdir, kXR_mv, kXR_open, kXR_pgread, kXR_pgwrite, kXR_prepare, kXR_protocol, kXR_query, kXR_read, kXR_readv, kXR_truncate, kXR_write, kXR_writev, ClientRequest::locate, ClientRequest::login, MarshallRequest(), ClientRequest::mkdir, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientRequest::mv, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, readahead_list::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, XrdProto::write_list::offset, ClientChkPointRequest::opcode, ClientRequest::open, ClientLocateRequest::options, ClientOpenRequest::options, ClientPrepareRequest::optionX, ClientRequest::pgread, ClientRequest::pgwrite, ClientLoginRequest::pid, ClientPrepareRequest::port, ClientRequest::prepare, ClientRequest::protocol, ClientRequest::query, ClientRequest::read, ClientRequest::readv, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, ClientRequest::truncate, XrdProto::write_list::wlen, ClientRequest::write, and ClientRequest::writev.

+ Here is the call graph for this function:

◆ MarshallRequest() [2/2]

static XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( Message msg)
inlinestatic

Marshal the outgoing message.

Definition at line 175 of file XrdClXRootDTransport.hh.

176  {
177  MarshallRequest( msg->GetBuffer() );
178  msg->SetIsMarshalled( true );
179  return XRootDStatus();
180  }

References XrdCl::Buffer::GetBuffer(), and XrdCl::Message::SetIsMarshalled().

Referenced by MarshallRequest(), MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), XrdCl::MessageUtils::SendMessage(), and UnMarshallRequest().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ MessageReceived()

uint32_t XrdCl::XRootDTransport::MessageReceived ( Message msg,
uint16_t  subStream,
AnyObject channelData 
)
virtual

Check if the message invokes a stream action.

Implements XrdCl::TransportHandler.

Definition at line 1561 of file XrdClXRootDTransport.cc.

1564  {
1565  XRootDChannelInfo *info = 0;
1566  channelData.Get( info );
1567  XrdSysMutexHelper scopedLock( info->mutex );
1568  Log *log = DefaultEnv::GetLog();
1569 
1570  //--------------------------------------------------------------------------
1571  // Update the substream queues
1572  //--------------------------------------------------------------------------
1573  info->strmSelector->MsgReceived( subStream );
1574 
1575  //--------------------------------------------------------------------------
1576  // Check whether this message is a response to a request that has
1577  // timed out, and if so, drop it
1578  //--------------------------------------------------------------------------
1579  ServerResponse *rsp = (ServerResponse*)msg.GetBuffer();
1580  if( rsp->hdr.status == kXR_attn )
1581  {
1582  return NoAction;
1583  }
1584 
1585  if( info->sidManager->IsTimedOut( rsp->hdr.streamid ) )
1586  {
1587  log->Error( XRootDTransportMsg, "Message %p, stream [%d, %d] is a "
1588  "response that we're no longer interested in (timed out)",
1589  &msg, rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
1590  //------------------------------------------------------------------------
1591  // If it is kXR_waitresp there will be another one,
1592  // so we don't release the sid yet
1593  //------------------------------------------------------------------------
1594  if( rsp->hdr.status != kXR_waitresp )
1595  info->sidManager->ReleaseTimedOut( rsp->hdr.streamid );
1596  //------------------------------------------------------------------------
1597  // If it is a successful response to an open request
1598  // that timed out, we need to send a close
1599  //------------------------------------------------------------------------
1600  uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1601  std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1602  if( sidIt != info->sentOpens.end() )
1603  {
1604  info->sentOpens.erase( sidIt );
1605  if( rsp->hdr.status == kXR_ok ) return RequestClose;
1606  }
1607  return DigestMsg;
1608  }
1609 
1610  //--------------------------------------------------------------------------
1611  // If we have a wait or waitresp
1612  //--------------------------------------------------------------------------
1613  uint32_t seconds = 0;
1614  if( rsp->hdr.status == kXR_wait )
1615  seconds = ntohl( rsp->body.wait.seconds ) + 5; // we need extra time
1616  // to re-send the request
1617  else if( rsp->hdr.status == kXR_waitresp )
1618  {
1619  seconds = ntohl( rsp->body.waitresp.seconds );
1620 
1621  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %u seconds, "
1622  "setting up wait barrier.",
1623  info->streamName.c_str(),
1624  seconds );
1625  }
1626 
1627  time_t barrier = time(0) + seconds;
1628  if( info->waitBarrier < barrier )
1629  info->waitBarrier = barrier;
1630 
1631  //--------------------------------------------------------------------------
1632  // If we got a response to an open request, we may need to bump the counter
1633  // of open files
1634  //--------------------------------------------------------------------------
1635  uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1636  std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1637  if( sidIt != info->sentOpens.end() )
1638  {
1639  if( rsp->hdr.status == kXR_waitresp )
1640  return NoAction;
1641  info->sentOpens.erase( sidIt );
1642  if( rsp->hdr.status == kXR_ok )
1643  {
1644  ++info->openFiles;
1645  info->finstcnt.fetch_add( 1, std::memory_order_relaxed ); // another file File object instance has been bound with this connection
1646  }
1647  return NoAction;
1648  }
1649 
1650  //--------------------------------------------------------------------------
1651  // If we got a response to a close, we may need to decrement the counter of
1652  // open files
1653  //--------------------------------------------------------------------------
1654  sidIt = info->sentCloses.find( sid );
1655  if( sidIt != info->sentCloses.end() )
1656  {
1657  if( rsp->hdr.status == kXR_waitresp )
1658  return NoAction;
1659  info->sentCloses.erase( sidIt );
1660  --info->openFiles;
1661  return NoAction;
1662  }
1663  return NoAction;
1664  }
kXR_char streamid[2]
Definition: XProtocol.hh:914
@ kXR_waitresp
Definition: XProtocol.hh:906
@ kXR_ok
Definition: XProtocol.hh:899
@ kXR_attn
Definition: XProtocol.hh:901
@ kXR_wait
Definition: XProtocol.hh:905
@ RequestClose
Send a close request.
const uint64_t XRootDMsg

References ServerResponse::body, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, kXR_attn, kXR_ok, kXR_wait, kXR_waitresp, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportHandler::NoAction, XrdCl::XRootDChannelInfo::openFiles, XrdCl::TransportHandler::RequestClose, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, ServerResponseHeader::status, ServerResponseHeader::streamid, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, XrdCl::XRootDChannelInfo::waitBarrier, XrdCl::XRootDMsg, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ MessageSent()

void XrdCl::XRootDTransport::MessageSent ( Message msg,
uint16_t  subStream,
uint32_t  bytesSent,
AnyObject channelData 
)
virtual

Notify the transport about a message having been sent.

Implements XrdCl::TransportHandler.

Definition at line 1669 of file XrdClXRootDTransport.cc.

1673  {
1674  // Called when a message has been sent. For messages that return on a
1675  // different pathid (and hence may use a different poller) it is possible
1676  // that the server has already replied and the reply will trigger
1677  // MessageReceived() before this method has been called. However for open
1678  // and close this is never the case and this method is used for tracking
1679  // only those.
1680  XRootDChannelInfo *info = 0;
1681  channelData.Get( info );
1682  XrdSysMutexHelper scopedLock( info->mutex );
1683  ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1684  uint16_t reqid = ntohs( req->header.requestid );
1685 
1686 
1687  //--------------------------------------------------------------------------
1688  // We need to track opens to know if we can close streams due to idleness
1689  //--------------------------------------------------------------------------
1690  uint16_t sid;
1691  memcpy( &sid, req->header.streamid, 2 );
1692 
1693  if( reqid == kXR_open )
1694  info->sentOpens.insert( sid );
1695  else if( reqid == kXR_close )
1696  info->sentCloses.insert( sid );
1697  }
kXR_char streamid[2]
Definition: XProtocol.hh:156

References XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_close, kXR_open, XrdCl::XRootDChannelInfo::mutex, ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ Multiplex()

PathID XrdCl::XRootDTransport::Multiplex ( Message msg,
AnyObject channelData,
PathID hint = 0 
)
virtual

Return the ID for the up stream this message should be sent by and the down stream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 822 of file XrdClXRootDTransport.cc.

823  {
824  return PathID( 0, 0 );
825  }

◆ MultiplexSubStream()

PathID XrdCl::XRootDTransport::MultiplexSubStream ( Message msg,
AnyObject channelData,
PathID hint = 0 
)
virtual

Return the ID for the up substream this message should be sent by and the down substream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 830 of file XrdClXRootDTransport.cc.

833  {
834  XRootDChannelInfo *info = 0;
835  channelData.Get( info );
836  XrdSysMutexHelper scopedLock( info->mutex );
837 
838  //--------------------------------------------------------------------------
839  // If we're not connected to a data server or we don't know that yet
840  // we stream through 0
841  //--------------------------------------------------------------------------
842  if( !(info->serverFlags & kXR_isServer) || info->stream.size() == 0 )
843  return PathID( 0, 0 );
844 
845  //--------------------------------------------------------------------------
846  // Select the streams
847  //--------------------------------------------------------------------------
848  Log *log = DefaultEnv::GetLog();
849  uint16_t upStream = 0;
850  uint16_t downStream = 0;
851 
852  if( hint )
853  {
854  upStream = hint->up;
855  downStream = hint->down;
856  }
857  else
858  {
859  upStream = 0;
860  std::vector<bool> connected;
861  connected.reserve( info->stream.size() - 1 );
862  size_t nbConnected = 0;
863  for( size_t i = 1; i < info->stream.size(); ++i )
864  if( info->stream[i].status == XRootDStreamInfo::Connected )
865  {
866  connected.push_back( true );
867  ++nbConnected;
868  }
869  else
870  connected.push_back( false );
871 
872  if( nbConnected == 0 )
873  downStream = 0;
874  else
875  downStream = info->strmSelector->Select( connected );
876  }
877 
878  if( upStream >= info->stream.size() )
879  {
880  log->Debug( XRootDTransportMsg,
881  "[%s] Up link stream %d does not exist, using 0",
882  info->streamName.c_str(), upStream );
883  upStream = 0;
884  }
885 
886  if( downStream >= info->stream.size() )
887  {
888  log->Debug( XRootDTransportMsg,
889  "[%s] Down link stream %d does not exist, using 0",
890  info->streamName.c_str(), downStream );
891  downStream = 0;
892  }
893 
894  //--------------------------------------------------------------------------
895  // Modify the message
896  //--------------------------------------------------------------------------
897  UnMarshallRequest( msg );
898  ClientRequestHdr *hdr = (ClientRequestHdr*)msg->GetBuffer();
899  switch( hdr->requestid )
900  {
901  //------------------------------------------------------------------------
902  // Read - we update the path id to tell the server where we want to
903  // get the response, but we still send the request through stream 0
904  // We need to allocate space for read_args if we don't have it
905  // included yet
906  //------------------------------------------------------------------------
907  case kXR_read:
908  {
909  if( msg->GetSize() < sizeof(ClientReadRequest) + 8 )
910  {
911  msg->ReAllocate( sizeof(ClientReadRequest) + 8 );
912  void *newBuf = msg->GetBuffer(sizeof(ClientReadRequest));
913  memset( newBuf, 0, 8 );
914  ClientReadRequest *req = (ClientReadRequest*)msg->GetBuffer();
915  req->dlen += 8;
916  }
917  read_args *args = (read_args*)msg->GetBuffer(sizeof(ClientReadRequest));
918  args->pathid = info->stream[downStream].pathId;
919  break;
920  }
921 
922 
923  //------------------------------------------------------------------------
924  // PgRead - we update the path id to tell the server where we want to
925  // get the response, but we still send the request through stream 0
926  // We need to allocate space for ClientPgReadReqArgs if we don't have it
927  // included yet
928  //------------------------------------------------------------------------
929  case kXR_pgread:
930  {
931  if( msg->GetSize() < sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) )
932  {
933  msg->ReAllocate( sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) );
934  void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
935  memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
936  ClientPgReadRequest *req = (ClientPgReadRequest*)msg->GetBuffer();
937  req->dlen += sizeof( ClientPgReadReqArgs );
938  }
939  ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
940  msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
941  args->pathid = info->stream[downStream].pathId;
942  break;
943  }
944 
945  //------------------------------------------------------------------------
946  // ReadV - the situation is identical to read but we don't need any
947  // additional structures to specify the return path
948  //------------------------------------------------------------------------
949  case kXR_readv:
950  {
951  ClientReadVRequest *req = (ClientReadVRequest*)msg->GetBuffer();
952  req->pathid = info->stream[downStream].pathId;
953  break;
954  }
955 
956  //------------------------------------------------------------------------
957  // Write - multiplexing writes doesn't work properly in the server
958  //------------------------------------------------------------------------
959  case kXR_write:
960  {
961 // ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
962 // req->pathid = info->stream[downStream].pathId;
963  break;
964  }
965 
966  //------------------------------------------------------------------------
967  // WriteV - multiplexing writes doesn't work properly in the server
968  //------------------------------------------------------------------------
969  case kXR_writev:
970  {
971 // ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
972 // req->pathid = info->stream[downStream].pathId;
973  break;
974  }
975 
976  //------------------------------------------------------------------------
977  // PgWrite - multiplexing writes doesn't work properly in the server
978  //------------------------------------------------------------------------
979  case kXR_pgwrite:
980  {
981 // ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
982 // req->pathid = info->stream[downStream].pathId;
983  break;
984  }
985  };
986  MarshallRequest( msg );
987  return PathID( upStream, downStream );
988  }
kXR_char pathid
Definition: XProtocol.hh:653
kXR_int32 dlen
Definition: XProtocol.hh:648
static XRootDStatus UnMarshallRequest(Message *msg)

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Debug(), ClientPgReadRequest::dlen, ClientReadRequest::dlen, XrdCl::PathID::down, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), kXR_isServer, kXR_pgread, kXR_pgwrite, kXR_read, kXR_readv, kXR_write, kXR_writev, MarshallRequest(), XrdCl::XRootDChannelInfo::mutex, ClientPgReadReqArgs::pathid, read_args::pathid, ClientReadVRequest::pathid, XrdCl::Buffer::ReAllocate(), ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, UnMarshallRequest(), XrdCl::PathID::up, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ NbConnectedStrm()

uint16_t XrdCl::XRootDTransport::NbConnectedStrm ( AnyObject channelData)
static

Number of currently connected data streams.

Definition at line 1468 of file XrdClXRootDTransport.cc.

1469  {
1470  XRootDChannelInfo *info = 0;
1471  channelData.Get( info );
1472  XrdSysMutexHelper scopedLock( info->mutex );
1473 
1474  uint16_t nbConnected = 0;
1475  for( size_t i = 1; i < info->stream.size(); ++i )
1476  if( info->stream[i].status == XRootDStreamInfo::Connected )
1477  ++nbConnected;
1478 
1479  return nbConnected;
1480  }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::AnyObject::Get(), XrdCl::XRootDChannelInfo::mutex, and XrdCl::XRootDChannelInfo::stream.

Referenced by XrdCl::Channel::NbConnectedStrm().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ NeedControlConnection()

virtual bool XrdCl::XRootDTransport::NeedControlConnection ( )
inlinevirtual

Return the information whether a control connection needs to be valid before establishing other connections

Definition at line 167 of file XrdClXRootDTransport.hh.

168  {
169  return true;
170  }

◆ NeedEncryption()

bool XrdCl::XRootDTransport::NeedEncryption ( HandShakeData handShakeData,
AnyObject channelData 
)
virtual
Returns
: true if encryption should be turned on, false otherwise

Implements XrdCl::TransportHandler.

Definition at line 1763 of file XrdClXRootDTransport.cc.

1765  {
1766  XRootDChannelInfo *info = 0;
1767  channelData.Get( info );
1768 
1770  int notlsok = DefaultNoTlsOK;
1771  env->GetInt( "NoTlsOK", notlsok );
1772 
1773  if( notlsok )
1774  return info->encrypted;
1775 
1776  // Did the server instructed us to switch to TLS right away?
1777  if( info->serverFlags & kXR_gotoTLS )
1778  {
1779  info->encrypted = true;
1780  return true ;
1781  }
1782 
1783  XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
1784 
1785  //--------------------------------------------------------------------------
1786  // The control stream (sub-stream 0) might need to switch to TLS before
1787  // login or after login
1788  //--------------------------------------------------------------------------
1789  if( handShakeData->subStreamId == 0 )
1790  {
1791  //------------------------------------------------------------------------
1792  // We are about to login and the server asked to start encrypting
1793  // before login
1794  //------------------------------------------------------------------------
1795  if( ( sInfo.status == XRootDStreamInfo::LoginSent ) &&
1796  ( info->serverFlags & kXR_tlsLogin ) )
1797  {
1798  info->encrypted = true;
1799  return true;
1800  }
1801 
1802  //--------------------------------------------------------------------
1803  // The hand-shake is done and the server requested to encrypt the session
1804  //--------------------------------------------------------------------
1805  if( (sInfo.status == XRootDStreamInfo::Connected ||
1806  //--------------------------------------------------------------------
1807  // we really need to turn on TLS before we sent kXR_endsess and we
1808  // are about to do so (1st enable encryption, then send kXR_endsess)
1809  //--------------------------------------------------------------------
1810  sInfo.status == XRootDStreamInfo::EndSessionSent ) &&
1811  ( info->serverFlags & kXR_tlsSess ) )
1812  {
1813  info->encrypted = true;
1814  return true;
1815  }
1816  }
1817  //--------------------------------------------------------------------------
1818  // A data stream (sub-stream > 0) if need be will be switched to TLS before
1819  // bind.
1820  //--------------------------------------------------------------------------
1821  else
1822  {
1823  //------------------------------------------------------------------------
1824  // We are about to bind a data stream and the server asked to start
1825  // encrypting before bind
1826  //------------------------------------------------------------------------
1827  if( ( sInfo.status == XRootDStreamInfo::BindSent ) &&
1828  ( info->serverFlags & kXR_tlsData ) )
1829  {
1830  info->encrypted = true;
1831  return true;
1832  }
1833  }
1834 
1835  return false;
1836  }
#define kXR_tlsLogin
Definition: XProtocol.hh:1184
#define kXR_gotoTLS
Definition: XProtocol.hh:1180
#define kXR_tlsSess
Definition: XProtocol.hh:1185
#define kXR_tlsData
Definition: XProtocol.hh:1182
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
const int DefaultNoTlsOK

References XrdCl::XRootDStreamInfo::BindSent, XrdCl::XRootDStreamInfo::Connected, XrdCl::DefaultNoTlsOK, XrdCl::XRootDChannelInfo::encrypted, XrdCl::XRootDStreamInfo::EndSessionSent, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), kXR_gotoTLS, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDStreamInfo::LoginSent, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::HandShakeData::subStreamId.

+ Here is the call graph for this function:

◆ Query()

Status XrdCl::XRootDTransport::Query ( uint16_t  query,
AnyObject result,
AnyObject channelData 
)
virtual

Query the channel.

Implements XrdCl::TransportHandler.

Definition at line 1513 of file XrdClXRootDTransport.cc.

1516  {
1517  XRootDChannelInfo *info = 0;
1518  channelData.Get( info );
1519  XrdSysMutexHelper scopedLock( info->mutex );
1520 
1521  switch( query )
1522  {
1523  //------------------------------------------------------------------------
1524  // Protocol name
1525  //------------------------------------------------------------------------
1526  case TransportQuery::Name:
1527  result.Set( (const char*)"XRootD", false );
1528  return Status();
1529 
1530  //------------------------------------------------------------------------
1531  // Authentication
1532  //------------------------------------------------------------------------
1533  case TransportQuery::Auth:
1534  result.Set( new std::string( info->authProtocolName ), false );
1535  return Status();
1536 
1537  //------------------------------------------------------------------------
1538  // Server flags
1539  //------------------------------------------------------------------------
1541  result.Set( new int( info->serverFlags ), false );
1542  return Status();
1543 
1544  //------------------------------------------------------------------------
1545  // Protocol version
1546  //------------------------------------------------------------------------
1548  result.Set( new int( info->protocolVersion ), false );
1549  return Status();
1550 
1552  result.Set( new bool( info->encrypted ), false );
1553  return Status();
1554  };
1555  return Status( stError, errQueryNotSupported );
1556  }
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted

References XrdCl::TransportQuery::Auth, XrdCl::XRootDChannelInfo::authProtocolName, XrdCl::XRootDChannelInfo::encrypted, XrdCl::errQueryNotSupported, XrdCl::AnyObject::Get(), XrdCl::XRootDQuery::IsEncrypted, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportQuery::Name, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::XRootDChannelInfo::protocolVersion, XrdCl::XRootDQuery::ServerFlags, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::AnyObject::Set(), and XrdCl::stError.

+ Here is the call graph for this function:

◆ SetDescription()

static void XrdCl::XRootDTransport::SetDescription ( Message msg)
inlinestatic

Get the description of a message.

Definition at line 245 of file XrdClXRootDTransport.hh.

246  {
247  std::ostringstream o;
248  GenerateDescription( msg->GetBuffer(), o );
249  msg->SetDescription( o.str() );
250  }

References GenerateDescription(), XrdCl::Buffer::GetBuffer(), and XrdCl::Message::SetDescription().

Referenced by XrdCl::FileStateHandler::Checkpoint(), XrdCl::FileStateHandler::ChkptWrt(), XrdCl::FileStateHandler::ChkptWrtV(), XrdCl::FileSystem::ChMod(), XrdCl::FileStateHandler::Close(), XrdCl::FileSystem::DirList(), XrdCl::FileStateHandler::Fcntl(), XrdCl::FileSystem::Locate(), XrdCl::FileSystem::MkDir(), XrdCl::FileSystem::Mv(), XrdCl::FileStateHandler::Open(), XrdCl::FileStateHandler::PgReadImpl(), XrdCl::FileStateHandler::PgWriteImpl(), XrdCl::FileSystem::Ping(), XrdCl::FileSystem::Prepare(), XrdCl::FileSystem::Protocol(), XrdCl::FileSystem::Query(), XrdCl::FileStateHandler::Read(), XrdCl::FileStateHandler::ReadV(), XrdCl::MessageUtils::RewriteCGIAndPath(), XrdCl::FileSystem::Rm(), XrdCl::FileSystem::RmDir(), XrdCl::FileSystem::Stat(), XrdCl::FileStateHandler::Stat(), XrdCl::FileSystem::StatVFS(), XrdCl::FileStateHandler::Sync(), XrdCl::FileSystem::Truncate(), XrdCl::FileStateHandler::Truncate(), XrdCl::FileStateHandler::VectorRead(), XrdCl::FileStateHandler::VectorWrite(), XrdCl::FileStateHandler::Visa(), XrdCl::FileStateHandler::Write(), and XrdCl::FileStateHandler::WriteV().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SubStreamNumber()

uint16_t XrdCl::XRootDTransport::SubStreamNumber ( AnyObject channelData)
virtual

Return a number of substreams per stream that should be created.

Implements XrdCl::TransportHandler.

Definition at line 995 of file XrdClXRootDTransport.cc.

996  {
997  XRootDChannelInfo *info = 0;
998  channelData.Get( info );
999  XrdSysMutexHelper scopedLock( info->mutex );
1000 
1001  //--------------------------------------------------------------------------
1002  // If the connection has been opened in order to orchestrate a TPC or
1003  // the remote server is a Manager or Metamanager we will need only one
1004  // (control) stream.
1005  //--------------------------------------------------------------------------
1006  if( info->istpc || !(info->serverFlags & kXR_isServer ) ) return 1;
1007 
1008  //--------------------------------------------------------------------------
1009  // Number of streams requested by user
1010  //--------------------------------------------------------------------------
1011  uint16_t ret = info->stream.size();
1012 
1014  int nodata = DefaultTlsNoData;
1015  env->GetInt( "TlsNoData", nodata );
1016 
1017  // Does the server require the stream 0 to be encrypted?
1018  bool srvTlsStrm0 = ( info->serverFlags & kXR_gotoTLS ) ||
1019  ( info->serverFlags & kXR_tlsLogin ) ||
1020  ( info->serverFlags & kXR_tlsSess );
1021  // Does the server NOT require the data streams to be encrypted?
1022  bool srvNoTlsData = !( info->serverFlags & kXR_tlsData );
1023  // Does the user require the stream 0 to be encrypted?
1024  bool usrTlsStrm0 = info->encrypted;
1025  // Does the user NOT require the data streams to be encrypted?
1026  bool usrNoTlsData = !info->encrypted || ( info->encrypted && nodata );
1027 
1028  if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1029  ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1030  {
1031  //------------------------------------------------------------------------
1032  // The server or user asked us to encrypt stream 0, but to send the data
1033  // (read/write) using a plain TCP connection
1034  //------------------------------------------------------------------------
1035  if( ret == 1 ) ++ret;
1036  }
1037 
1038  if( ret > info->stream.size() )
1039  {
1040  info->stream.resize( ret );
1041  info->strmSelector->AdjustQueues( ret );
1042  }
1043 
1044  return ret;
1045  }
const int DefaultTlsNoData

References XrdCl::DefaultTlsNoData, XrdCl::XRootDChannelInfo::encrypted, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::XRootDChannelInfo::istpc, kXR_gotoTLS, kXR_isServer, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::strmSelector.

+ Here is the call graph for this function:

◆ UnMarchalStatusMore()

XRootDStatus XrdCl::XRootDTransport::UnMarchalStatusMore ( Message msg)
static

Unmarshall the correction-segment of the status response for pgwrite.

Definition at line 1381 of file XrdClXRootDTransport.cc.

1382  {
1383  ServerResponseV2 *rsp = (ServerResponseV2*)msg.GetBuffer();
1384  uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest;
1385 
1386  switch( reqType )
1387  {
1388  case kXR_pgwrite:
1389  {
1390  //--------------------------------------------------------------------------
1391  // If there's no additional data there's nothing to unmarshal
1392  //--------------------------------------------------------------------------
1393  if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
1394  //--------------------------------------------------------------------------
1395  // If there's not enough data to form correction-segment report an error
1396  //--------------------------------------------------------------------------
1397  if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
1398  return XRootDStatus( stError, errInvalidMessage, 0,
1399  "kXR_status: invalid message size." );
1400 
1401  //--------------------------------------------------------------------------
1402  // Calculate the crc32c for the additional data
1403  //--------------------------------------------------------------------------
1404  ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)msg.GetBuffer( sizeof( ServerResponseV2 ) );
1405  cse->cseCRC = ntohl( cse->cseCRC );
1406  size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
1407  void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
1408  uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1409 
1410  //--------------------------------------------------------------------------
1411  // Do the integrity checks
1412  //--------------------------------------------------------------------------
1413  if( crcval != cse->cseCRC )
1414  {
1415  return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1416  "corrupted (crc32c integrity check failed)." );
1417  }
1418 
1419  cse->dlFirst = ntohs( cse->dlFirst );
1420  cse->dlLast = ntohs( cse->dlLast );
1421 
1422  size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
1423  sizeof( kXR_int64 );
1424  kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) +
1425  sizeof( ServerResponseBody_pgWrCSE ) );
1426 
1427  for( size_t i = 0; i < pgcnt; ++i )
1428  pgoffs[i] = ntohll( pgoffs[i] );
1429 
1430  return XRootDStatus();
1431  break;
1432  }
1433 
1434  default:
1435  break;
1436  }
1437 
1438  return XRootDStatus( stError, errNotSupported );
1439  }
ServerResponseStatus status
Definition: XProtocol.hh:1310
@ kXR_1stRequest
Definition: XProtocol.hh:111
long long kXR_int64
Definition: XPtypes.hh:98
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_pgWrCSE::cseCRC, ServerResponseBody_Status::dlen, ServerResponseBody_pgWrCSE::dlFirst, ServerResponseBody_pgWrCSE::dlLast, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errNotSupported, XrdCl::Buffer::GetBuffer(), kXR_1stRequest, kXR_pgwrite, ServerResponseBody_Status::requestid, ServerResponseV2::status, and XrdCl::stError.

Referenced by GetMore().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshallBody ( Message msg,
uint16_t  reqType 
)
static

Unmarshall the body of the incoming message.

Definition at line 1227 of file XrdClXRootDTransport.cc.

1228  {
1229  ServerResponse *m = (ServerResponse *)msg->GetBuffer();
1230 
1231  //--------------------------------------------------------------------------
1232  // kXR_ok
1233  //--------------------------------------------------------------------------
1234  if( m->hdr.status == kXR_ok )
1235  {
1236  switch( reqType )
1237  {
1238  //----------------------------------------------------------------------
1239  // kXR_protocol
1240  //----------------------------------------------------------------------
1241  case kXR_protocol:
1242  if( m->hdr.dlen < 8 )
1243  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_protocol: body too short." );
1244  m->body.protocol.pval = ntohl( m->body.protocol.pval );
1245  m->body.protocol.flags = ntohl( m->body.protocol.flags );
1246  break;
1247  }
1248  }
1249  //--------------------------------------------------------------------------
1250  // kXR_error
1251  //--------------------------------------------------------------------------
1252  else if( m->hdr.status == kXR_error )
1253  {
1254  if( m->hdr.dlen < 4 )
1255  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_error: body too short." );
1256  m->body.error.errnum = ntohl( m->body.error.errnum );
1257  }
1258 
1259  //--------------------------------------------------------------------------
1260  // kXR_wait
1261  //--------------------------------------------------------------------------
1262  else if( m->hdr.status == kXR_wait )
1263  {
1264  if( m->hdr.dlen < 4 )
1265  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_wait: body too short." );
1266  m->body.wait.seconds = htonl( m->body.wait.seconds );
1267  }
1268 
1269  //--------------------------------------------------------------------------
1270  // kXR_redirect
1271  //--------------------------------------------------------------------------
1272  else if( m->hdr.status == kXR_redirect )
1273  {
1274  if( m->hdr.dlen < 4 )
1275  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_redirect: body too short." );
1276  m->body.redirect.port = htonl( m->body.redirect.port );
1277  }
1278 
1279  //--------------------------------------------------------------------------
1280  // kXR_waitresp
1281  //--------------------------------------------------------------------------
1282  else if( m->hdr.status == kXR_waitresp )
1283  {
1284  if( m->hdr.dlen < 4 )
1285  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_waitresp: body too short." );
1286  m->body.waitresp.seconds = htonl( m->body.waitresp.seconds );
1287  }
1288 
1289  //--------------------------------------------------------------------------
1290  // kXR_attn
1291  //--------------------------------------------------------------------------
1292  else if( m->hdr.status == kXR_attn )
1293  {
1294  if( m->hdr.dlen < 4 )
1295  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_attn: body too short." );
1296  m->body.attn.actnum = htonl( m->body.attn.actnum );
1297  }
1298 
1299  return XRootDStatus();
1300  }
@ kXR_redirect
Definition: XProtocol.hh:904
@ kXR_error
Definition: XProtocol.hh:903

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), ServerResponse::hdr, kXR_attn, kXR_error, kXR_ok, kXR_protocol, kXR_redirect, kXR_wait, kXR_waitresp, ServerResponseHeader::status, and XrdCl::stError.

Referenced by XrdCl::XRootDMsgHandler::Process().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallHeader()

void XrdCl::XRootDTransport::UnMarshallHeader ( Message msg)
static

Unmarshall the header incoming message.

Definition at line 1444 of file XrdClXRootDTransport.cc.

1445  {
1446  ServerResponseHeader *header = (ServerResponseHeader *)msg.GetBuffer();
1447  header->status = ntohs( header->status );
1448  header->dlen = ntohl( header->dlen );
1449  }

References ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), and ServerResponseHeader::status.

Referenced by GetHeader().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallRequest()

XRootDStatus XrdCl::XRootDTransport::UnMarshallRequest ( Message msg)
static

Unmarshall the request - sometimes the requests need to be rewritten, so we need to unmarshall them

Definition at line 1206 of file XrdClXRootDTransport.cc.

1207  {
1208  if( !msg->IsMarshalled() ) return XRootDStatus( stOK, suAlreadyDone );
1209  // We rely on the marshaling process to be symmetric!
1210  // First we unmarshall the request ID and the length because
1211  // MarshallRequest() relies on these, and then we need to unmarshall these
1212  // two again, because they get marshalled in MarshallRequest().
1213  // All this is pretty damn ugly and should be rewritten.
1214  ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1215  req->header.requestid = htons( req->header.requestid );
1216  req->header.dlen = htonl( req->header.dlen );
1217  XRootDStatus st = MarshallRequest( msg );
1218  req->header.requestid = htons( req->header.requestid );
1219  req->header.dlen = htonl( req->header.dlen );
1220  msg->SetIsMarshalled( false );
1221  return st;
1222  }
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42

References ClientRequestHdr::dlen, XrdCl::Buffer::GetBuffer(), ClientRequest::header, XrdCl::Message::IsMarshalled(), MarshallRequest(), ClientRequestHdr::requestid, XrdCl::Message::SetIsMarshalled(), XrdCl::stOK, and XrdCl::suAlreadyDone.

Referenced by MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshalStatusBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshalStatusBody ( Message msg,
uint16_t  reqType 
)
static

Unmarshall the body of the status response.

Definition at line 1305 of file XrdClXRootDTransport.cc.

1306  {
1307  //--------------------------------------------------------------------------
1308  // Calculate the crc32c before the unmarshaling the body!
1309  //--------------------------------------------------------------------------
1310  ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
1311  char *buffer = msg.GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
1312  size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
1313  uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1314 
1315  size_t stlen = sizeof( ServerResponseStatus );
1316  switch( reqType )
1317  {
1318  case kXR_pgread:
1319  {
1320  stlen += sizeof( ServerResponseBody_pgRead );
1321  break;
1322  }
1323 
1324  case kXR_pgwrite:
1325  {
1326  stlen += sizeof( ServerResponseBody_pgWrite );
1327  break;
1328  }
1329  }
1330 
1331  if( msg.GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
1332  "kXR_status: invalid message size." );
1333 
1334  rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
1335  rspst->bdy.dlen = ntohl( rspst->bdy.dlen );
1336 
1337  switch( reqType )
1338  {
1339  case kXR_pgread:
1340  {
1341  ServerResponseBody_pgRead *pgrdbdy = (ServerResponseBody_pgRead*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1342  pgrdbdy->offset = ntohll( pgrdbdy->offset );
1343  break;
1344  }
1345 
1346  case kXR_pgwrite:
1347  {
1348  ServerResponseBody_pgWrite *pgwrtbdy = (ServerResponseBody_pgWrite*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1349  pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
1350  break;
1351  }
1352  }
1353 
1354  //--------------------------------------------------------------------------
1355  // Do the integrity checks
1356  //--------------------------------------------------------------------------
1357  if( crcval != rspst->bdy.crc32c )
1358  {
1359  return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1360  "corrupted (crc32c integrity check failed)." );
1361  }
1362 
1363  if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
1364  rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
1365  {
1366  return XRootDStatus( stError, errDataError, 0, "response header corrupted "
1367  "(stream ID mismatch)." );
1368  }
1369 
1370 
1371 
1372  if( rspst->bdy.requestid + kXR_1stRequest != reqType )
1373  {
1374  return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
1375  "(request ID mismatch)." );
1376  }
1377 
1378  return XRootDStatus();
1379  }
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1261

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_Status::crc32c, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetSize(), ServerResponseStatus::hdr, kXR_1stRequest, kXR_pgread, kXR_pgwrite, ServerResponseBody_pgRead::offset, ServerResponseBody_pgWrite::offset, ServerResponseBody_Status::requestid, XrdCl::stError, ServerResponseHeader::streamid, and ServerResponseBody_Status::streamID.

Referenced by XrdCl::XRootDMsgHandler::InspectStatusRsp().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ WaitBeforeExit()

void XrdCl::XRootDTransport::WaitBeforeExit ( )
virtual

Wait until the program can safely exit.

Implements XrdCl::TransportHandler.

Definition at line 1754 of file XrdClXRootDTransport.cc.

1755  {
1756  XrdSysRWLockHelper scope( pSecUnloadHandler->lock, false ); // obtain write lock
1757  pSecUnloadHandler->unloaded = true;
1758  }

References XrdCl::PluginUnloadHandler::lock, and XrdCl::PluginUnloadHandler::unloaded.

Friends And Related Function Documentation

◆ PluginUnloadHandler

friend struct PluginUnloadHandler
friend

Definition at line 432 of file XrdClXRootDTransport.hh.


The documentation for this class was generated from the following files: