XRootD
XrdClXRootDTransport.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClSocket.hh"
29 #include "XrdCl/XrdClMessage.hh"
30 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClSIDManager.hh"
32 #include "XrdCl/XrdClUtils.hh"
34 #include "XrdCl/XrdClTls.hh"
35 #include "XrdNet/XrdNetAddr.hh"
36 #include "XrdNet/XrdNetUtils.hh"
37 #include "XrdSys/XrdSysPlatform.hh"
38 #include "XrdOuc/XrdOucErrInfo.hh"
39 #include "XrdOuc/XrdOucUtils.hh"
40 #include "XrdOuc/XrdOucCRC.hh"
42 #include "XrdSys/XrdSysTimer.hh"
43 #include "XrdSys/XrdSysAtomics.hh"
44 #include "XrdSys/XrdSysPlugin.hh"
46 #include "XrdSec/XrdSecProtect.hh"
47 #include "XrdSys/XrdSysE2T.hh"
48 #include "XrdCl/XrdClTls.hh"
49 #include "XrdCl/XrdClSocket.hh"
50 #include "XProtocol/XProtocol.hh"
51 #include "XrdVersion.hh"
52 
53 #include <arpa/inet.h>
54 #include <sys/types.h>
55 #include <unistd.h>
56 #include <dlfcn.h>
57 #include <sstream>
58 #include <iomanip>
59 #include <set>
60 #include <limits>
61 
62 #include <atomic>
63 
65 
66 namespace XrdCl
67 {
69  {
70  PluginUnloadHandler() : unloaded( false ) { }
71 
72  static void UnloadHandler()
73  {
74  UnloadHandler( "root" );
75  UnloadHandler( "xroot" );
76  }
77 
78  static void UnloadHandler( const std::string &trProt )
79  {
81  TransportHandler *trHandler = trManager->GetHandler( trProt );
82  trHandler->WaitBeforeExit();
83  }
84 
85  void Register( const std::string &protocol )
86  {
87  XrdSysRWLockHelper scope( lock, false ); // obtain write lock
88  std::pair< std::set<std::string>::iterator, bool > ret = protocols.insert( protocol );
89  // if that's the first time we are using the protocol, the sec lib
90  // was just loaded so now's the time to register the atexit handler
91  if( ret.second )
92  {
93  atexit( UnloadHandler );
94  }
95  }
96 
98  bool unloaded;
99  std::set<std::string> protocols;
100  };
101 
102  //----------------------------------------------------------------------------
104  //----------------------------------------------------------------------------
106  {
107  //--------------------------------------------------------------------------
108  // Define the stream status for the link negotiation purposes
109  //--------------------------------------------------------------------------
111  {
120  Connected
121  };
122 
123  //--------------------------------------------------------------------------
124  // Constructor
125  //--------------------------------------------------------------------------
127  {
128  }
129 
131  uint8_t pathId;
132  };
133 
134  //----------------------------------------------------------------------------
136  //----------------------------------------------------------------------------
138  {
139  StreamSelector( uint16_t size )
140  {
141  //----------------------------------------------------------------------
142  // Subtract one because we shouldn't take into account the control
143  // stream.
144  //----------------------------------------------------------------------
145  strmqueues.resize( size - 1, 0 );
146  }
147 
148  //------------------------------------------------------------------------
149  // @param size : number of streams
150  //------------------------------------------------------------------------
151  void AdjustQueues( uint16_t size )
152  {
153  strmqueues.resize( size - 1, 0);
154  }
155 
156  //------------------------------------------------------------------------
157  // @param connected : bitarray stating if given sub-stream is connected
158  //
159  // @return : substream number
160  //------------------------------------------------------------------------
161  uint16_t Select( const std::vector<bool> &connected )
162  {
163  uint16_t ret = 0;
164  size_t minval = std::numeric_limits<size_t>::max();
165 
166  for( uint16_t i = 0; i < connected.size() && i < strmqueues.size(); ++i )
167  {
168  if( !connected[i] ) continue;
169 
170  if( strmqueues[i] < minval )
171  {
172  ret = i;
173  minval = strmqueues[i];
174  }
175  }
176 
177  ++strmqueues[ret];
178  return ret + 1;
179  }
180 
181  //--------------------------------------------------------------------------
182  // Update queue for given substream
183  //--------------------------------------------------------------------------
184  void MsgReceived( uint16_t substrm )
185  {
186  if( substrm > 0 )
187  --strmqueues[substrm - 1];
188  }
189 
190  private:
191 
192  std::vector<size_t> strmqueues;
193  };
194 
196  {
197  BindPrefSelector( std::vector<std::string> && bindprefs ) :
198  bindprefs( std::move( bindprefs ) ), next( 0 )
199  {
200  }
201 
202  inline const std::string& Get()
203  {
204  std::string &ret = bindprefs[next];
205  ++next;
206  if( next >= bindprefs.size() )
207  next = 0;
208  return ret;
209  }
210 
211  private:
212  std::vector<std::string> bindprefs;
213  size_t next;
214  };
215 
216  //----------------------------------------------------------------------------
218  //----------------------------------------------------------------------------
220  {
221  //--------------------------------------------------------------------------
222  // Constructor
223  //--------------------------------------------------------------------------
224  XRootDChannelInfo( const URL &url ):
225  serverFlags(0),
226  protocolVersion(0),
227  firstLogIn(true),
228  authBuffer(0),
229  authProtocol(0),
230  authParams(0),
231  authEnv(0),
232  finstcnt(0),
233  openFiles(0),
234  waitBarrier(0),
235  protection(0),
236  protRespBody(0),
237  protRespSize(0),
238  encrypted(false),
239  istpc(false)
240  {
242  memset( sessionId, 0, 16 );
243  memset( oldSessionId, 0, 16 );
244  }
245 
246  //--------------------------------------------------------------------------
247  // Destructor
248  //--------------------------------------------------------------------------
250  {
251  delete [] authBuffer;
252  }
253 
254  typedef std::vector<XRootDStreamInfo> StreamInfoVector;
255 
256  //--------------------------------------------------------------------------
257  // Data
258  //--------------------------------------------------------------------------
259  uint32_t serverFlags;
260  uint32_t protocolVersion;
261  uint8_t sessionId[16];
262  uint8_t oldSessionId[16];
264  std::shared_ptr<SIDManager> sidManager;
265  char *authBuffer;
270  std::string streamName;
271  std::string authProtocolName;
272  std::set<uint16_t> sentOpens;
273  std::set<uint16_t> sentCloses;
274  std::atomic<uint32_t> finstcnt; // file instance count
275  uint32_t openFiles;
276  time_t waitBarrier;
279  unsigned int protRespSize;
280  std::unique_ptr<StreamSelector> strmSelector;
281  bool encrypted;
282  bool istpc;
283  std::unique_ptr<BindPrefSelector> bindSelector;
284  std::string logintoken;
286  };
287 
288  //----------------------------------------------------------------------------
289  // Constructor
290  //----------------------------------------------------------------------------
292  pSecUnloadHandler( new PluginUnloadHandler() )
293  {
294  }
295 
296  //----------------------------------------------------------------------------
297  // Destructor
298  //----------------------------------------------------------------------------
300  {
301  delete pSecUnloadHandler; pSecUnloadHandler = 0;
302  }
303 
304  //----------------------------------------------------------------------------
305  // Read message header from socket
306  //----------------------------------------------------------------------------
308  {
309  //--------------------------------------------------------------------------
310  // A new message - allocate the space needed for the header
311  //--------------------------------------------------------------------------
312  if( message.GetCursor() == 0 && message.GetSize() < 8 )
313  message.Allocate( 8 );
314 
315  //--------------------------------------------------------------------------
316  // Read the message header
317  //--------------------------------------------------------------------------
318  if( message.GetCursor() < 8 )
319  {
320  size_t leftToBeRead = 8 - message.GetCursor();
321  while( leftToBeRead )
322  {
323  int bytesRead = 0;
324  XRootDStatus status = socket->Read( message.GetBufferAtCursor(),
325  leftToBeRead, bytesRead );
326  if( !status.IsOK() || status.code == suRetry )
327  return status;
328 
329  leftToBeRead -= bytesRead;
330  message.AdvanceCursor( bytesRead );
331  }
332  UnMarshallHeader( message );
333 
334  uint32_t bodySize = *(uint32_t*)(message.GetBuffer(4));
335  Log *log = DefaultEnv::GetLog();
336  log->Dump( XRootDTransportMsg, "[msg: %p] Expecting %d bytes of message "
337  "body", &message, bodySize );
338 
339  return XRootDStatus( stOK, suDone );
340  }
341  return XRootDStatus( stError, errInternal );
342  }
343 
344  //----------------------------------------------------------------------------
345  // Read message body from socket
346  //----------------------------------------------------------------------------
348  {
349  //--------------------------------------------------------------------------
350  // Retrieve the body
351  //--------------------------------------------------------------------------
352  size_t leftToBeRead = 0;
353  uint32_t bodySize = 0;
355  bodySize = rsphdr->dlen;
356 
357  if( message.GetSize() < bodySize + 8 )
358  message.ReAllocate( bodySize + 8 );
359 
360  leftToBeRead = bodySize-(message.GetCursor()-8);
361  while( leftToBeRead )
362  {
363  int bytesRead = 0;
364  XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
365 
366  if( !status.IsOK() || status.code == suRetry )
367  return status;
368 
369  leftToBeRead -= bytesRead;
370  message.AdvanceCursor( bytesRead );
371  }
372 
373  return XRootDStatus( stOK, suDone );
374  }
375 
376  //----------------------------------------------------------------------------
377  // Read more of the message body from socket
378  //----------------------------------------------------------------------------
380  {
382  if( rsphdr->status != kXR_status )
383  return XRootDStatus( stError, errInvalidOp );
384 
385  //--------------------------------------------------------------------------
386  // In case of non kXR_status responses we read all the response, including
387  // data. For kXR_status responses we first read only the remainder of the
388  // header. The header must then be unmarshalled, and then a second call to
389  // GetMore (repeated for suRetry as needed) will read the data.
390  //--------------------------------------------------------------------------
391 
392  uint32_t bodySize = rsphdr->dlen;
393  if( bodySize+8 < sizeof( ServerResponseStatus ) )
395  "kXR_status: invalid message size." );
396 
398  bodySize += rspst->bdy.dlen;
399 
400  if( message.GetSize() < bodySize + 8 )
401  message.ReAllocate( bodySize + 8 );
402 
403  size_t leftToBeRead = bodySize-(message.GetCursor()-8);
404  while( leftToBeRead )
405  {
406  int bytesRead = 0;
407  XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
408 
409  if( !status.IsOK() || status.code == suRetry )
410  return status;
411 
412  leftToBeRead -= bytesRead;
413  message.AdvanceCursor( bytesRead );
414  }
415 
416  // Unmarchal to message body
417  Log *log = DefaultEnv::GetLog();
419  if( !st.IsOK() && st.code == errDataError )
420  {
421  log->Error( XRootDTransportMsg, "[msg: %p] %s", &message,
422  st.GetErrorMessage().c_str() );
423  return st;
424  }
425 
426  if( !st.IsOK() )
427  {
428  log->Error( XRootDTransportMsg, "[msg: %p] Failed to unmarshall status body.",
429  &message );
430  return st;
431  }
432 
433  return XRootDStatus( stOK, suDone );
434  }
435 
436  //----------------------------------------------------------------------------
437  // Initialize channel
438  //----------------------------------------------------------------------------
440  AnyObject &channelData )
441  {
442  XRootDChannelInfo *info = new XRootDChannelInfo( url );
443  XrdSysMutexHelper scopedLock( info->mutex );
444  channelData.Set( info );
445 
446  Env *env = DefaultEnv::GetEnv();
447  int streams = DefaultSubStreamsPerChannel;
448  env->GetInt( "SubStreamsPerChannel", streams );
449  if( streams < 1 ) streams = 1;
450  info->stream.resize( streams );
451  info->strmSelector.reset( new StreamSelector( streams ) );
452  info->encrypted = url.IsSecure();
453  info->istpc = url.IsTPC();
454  info->logintoken = url.GetLoginToken();
455  }
456 
457  //----------------------------------------------------------------------------
458  // Finalize channel
459  //----------------------------------------------------------------------------
461  {
462  }
463 
464  //----------------------------------------------------------------------------
465  // HandShake
466  //----------------------------------------------------------------------------
468  AnyObject &channelData )
469  {
470  XRootDChannelInfo *info = 0;
471  channelData.Get( info );
472  XrdSysMutexHelper scopedLock( info->mutex );
473 
474  if( info->stream.size() <= handShakeData->subStreamId )
475  {
476  Log *log = DefaultEnv::GetLog();
478  "[%s] Internal error: not enough substreams",
479  handShakeData->streamName.c_str() );
480  return XRootDStatus( stFatal, errInternal );
481  }
482 
483  if( handShakeData->subStreamId == 0 )
484  {
485  info->streamName = handShakeData->streamName;
486  return HandShakeMain( handShakeData, channelData );
487  }
488  return HandShakeParallel( handShakeData, channelData );
489  }
490 
491  //----------------------------------------------------------------------------
492  // Hand shake the main stream
493  //----------------------------------------------------------------------------
494  XRootDStatus XRootDTransport::HandShakeMain( HandShakeData *handShakeData,
495  AnyObject &channelData )
496  {
497  XRootDChannelInfo *info = 0;
498  channelData.Get( info );
499  XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
500 
501  //--------------------------------------------------------------------------
502  // First step - we need to create and initial handshake and send it out
503  //--------------------------------------------------------------------------
504  if( sInfo.status == XRootDStreamInfo::Disconnected ||
506  {
507  handShakeData->out = GenerateInitialHSProtocol( handShakeData, info,
510  return XRootDStatus( stOK, suContinue );
511  }
512 
513  //--------------------------------------------------------------------------
514  // Second step - we got the reply message to the initial handshake
515  //--------------------------------------------------------------------------
517  {
518  XRootDStatus st = ProcessServerHS( handShakeData, info );
519  if( st.IsOK() )
521  else
523  return st;
524  }
525 
526  //--------------------------------------------------------------------------
527  // Third step - we got the response to the protocol request, we need
528  // to process it and send out a login request
529  //--------------------------------------------------------------------------
531  {
532  XRootDStatus st = ProcessProtocolResp( handShakeData, info );
533 
534  if( !st.IsOK() )
535  {
537  return st;
538  }
539 
540  if( st.code == suRetry )
541  {
542  handShakeData->out = GenerateProtocol( handShakeData, info,
545  return XRootDStatus( stOK, suRetry );
546  }
547 
548  handShakeData->out = GenerateLogIn( handShakeData, info );
550  return XRootDStatus( stOK, suContinue );
551  }
552 
553  //--------------------------------------------------------------------------
554  // Fourth step - handle the log in response and proceed with the
555  // authentication if required by the server
556  //--------------------------------------------------------------------------
557  if( sInfo.status == XRootDStreamInfo::LoginSent )
558  {
559  XRootDStatus st = ProcessLogInResp( handShakeData, info );
560 
561  if( !st.IsOK() )
562  {
564  return st;
565  }
566 
567  if( st.IsOK() && st.code == suDone )
568  {
569  //----------------------------------------------------------------------
570  // If it's not our first log in we need to end the previous session
571  // to make sure that the server noticed our disconnection and closed
572  // all the writable handles that we owned
573  //----------------------------------------------------------------------
574  if( !info->firstLogIn )
575  {
576  handShakeData->out = GenerateEndSession( handShakeData, info );
578  return XRootDStatus( stOK, suContinue );
579  }
580 
582  info->firstLogIn = false;
583  return st;
584  }
585 
586  st = DoAuthentication( handShakeData, info );
587  if( !st.IsOK() )
589  else
591  return st;
592  }
593 
594  //--------------------------------------------------------------------------
595  // Fifth step and later - proceed with the authentication
596  //--------------------------------------------------------------------------
597  if( sInfo.status == XRootDStreamInfo::AuthSent )
598  {
599  XRootDStatus st = DoAuthentication( handShakeData, info );
600 
601  if( !st.IsOK() )
602  {
604  return st;
605  }
606 
607  if( st.IsOK() && st.code == suDone )
608  {
609  //----------------------------------------------------------------------
610  // If it's not our first log in we need to end the previous session
611  //----------------------------------------------------------------------
612  if( !info->firstLogIn )
613  {
614  handShakeData->out = GenerateEndSession( handShakeData, info );
616  return XRootDStatus( stOK, suContinue );
617  }
618 
620  info->firstLogIn = false;
621  return st;
622  }
623 
624  return st;
625  }
626 
627  //--------------------------------------------------------------------------
628  // The last step - kXR_endsess returned
629  //--------------------------------------------------------------------------
631  {
632  XRootDStatus st = ProcessEndSessionResp( handShakeData, info );
633 
634  if( st.IsOK() && st.code == suDone )
635  {
637  }
638  else if( !st.IsOK() )
639  {
641  }
642 
643  return st;
644  }
645 
646  return XRootDStatus( stOK, suDone );
647  }
648 
649  //----------------------------------------------------------------------------
650  // Hand shake parallel stream
651  //----------------------------------------------------------------------------
652  XRootDStatus XRootDTransport::HandShakeParallel( HandShakeData *handShakeData,
653  AnyObject &channelData )
654  {
655  XRootDChannelInfo *info = 0;
656  channelData.Get( info );
657 
658  XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
659 
660  //--------------------------------------------------------------------------
661  // First step - we need to create and initial handshake and send it out
662  //--------------------------------------------------------------------------
663  if( sInfo.status == XRootDStreamInfo::Disconnected ||
664  sInfo.status == XRootDStreamInfo::Broken )
665  {
666  handShakeData->out = GenerateInitialHSProtocol( handShakeData, info,
668  sInfo.status = XRootDStreamInfo::HandShakeSent;
669  return XRootDStatus( stOK, suContinue );
670  }
671 
672  //--------------------------------------------------------------------------
673  // Second step - we got the reply message to the initial handshake,
674  // if successful we need to send bind
675  //--------------------------------------------------------------------------
676  if( sInfo.status == XRootDStreamInfo::HandShakeSent )
677  {
678  XRootDStatus st = ProcessServerHS( handShakeData, info );
679  if( st.IsOK() )
681  else
682  sInfo.status = XRootDStreamInfo::Broken;
683  return st;
684  }
685 
686  //--------------------------------------------------------------------------
687  // Second step bis - we got the response to the protocol request, we need
688  // to process it and send out a bind request
689  //--------------------------------------------------------------------------
690  if( sInfo.status == XRootDStreamInfo::HandShakeReceived )
691  {
692  XRootDStatus st = ProcessProtocolResp( handShakeData, info );
693 
694  if( !st.IsOK() )
695  {
696  sInfo.status = XRootDStreamInfo::Broken;
697  return st;
698  }
699 
700  handShakeData->out = GenerateBind( handShakeData, info );
701  sInfo.status = XRootDStreamInfo::BindSent;
702  return XRootDStatus( stOK, suContinue );
703  }
704 
705  //--------------------------------------------------------------------------
706  // Third step - we got the response to the kXR_bind
707  //--------------------------------------------------------------------------
708  if( sInfo.status == XRootDStreamInfo::BindSent )
709  {
710  XRootDStatus st = ProcessBindResp( handShakeData, info );
711 
712  if( !st.IsOK() )
713  {
714  sInfo.status = XRootDStreamInfo::Broken;
715  return st;
716  }
717  sInfo.status = XRootDStreamInfo::Connected;
718  return XRootDStatus();
719  }
720  return XRootDStatus();
721  }
722 
723  //------------------------------------------------------------------------
724  // @return true if handshake has been done and stream is connected,
725  // false otherwise
726  //------------------------------------------------------------------------
728  AnyObject &channelData )
729  {
730  XRootDChannelInfo *info = 0;
731  channelData.Get( info );
732  XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
733  return ( sInfo.status == XRootDStreamInfo::Connected );
734  }
735 
736  //----------------------------------------------------------------------------
737  // Check if the stream should be disconnected
738  //----------------------------------------------------------------------------
739  bool XRootDTransport::IsStreamTTLElapsed( time_t inactiveTime,
740  AnyObject &channelData )
741  {
742  XRootDChannelInfo *info = 0;
743  channelData.Get( info );
744  Env *env = DefaultEnv::GetEnv();
745  Log *log = DefaultEnv::GetLog();
746 
747  //--------------------------------------------------------------------------
748  // Check the TTL settings for the current server
749  //--------------------------------------------------------------------------
750  int ttl;
751  if( info->serverFlags & kXR_isServer )
752  {
753  ttl = DefaultDataServerTTL;
754  env->GetInt( "DataServerTTL", ttl );
755  }
756  else
757  {
759  env->GetInt( "LoadBalancerTTL", ttl );
760  }
761 
762  //--------------------------------------------------------------------------
763  // See whether we can give a go-ahead for the disconnection
764  //--------------------------------------------------------------------------
765  XrdSysMutexHelper scopedLock( info->mutex );
766  uint16_t allocatedSIDs = info->sidManager->GetNumberOfAllocatedSIDs();
767  log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
768  "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
769  info->streamName.c_str(), (long long) inactiveTime, ttl, allocatedSIDs,
770  info->openFiles, info->finstcnt.load( std::memory_order_relaxed ) );
771 
772  if( info->openFiles != 0 && info->finstcnt.load( std::memory_order_relaxed ) != 0 )
773  return false;
774 
775  if( !allocatedSIDs && inactiveTime > ttl )
776  return true;
777 
778  return false;
779  }
780 
781  //----------------------------------------------------------------------------
782  // Check the stream is broken - ie. TCP connection got broken and
783  // went undetected by the TCP stack
784  //----------------------------------------------------------------------------
786  AnyObject &channelData )
787  {
788  XRootDChannelInfo *info = 0;
789  channelData.Get( info );
790  Env *env = DefaultEnv::GetEnv();
791  Log *log = DefaultEnv::GetLog();
792 
793  int streamTimeout = DefaultStreamTimeout;
794  env->GetInt( "StreamTimeout", streamTimeout );
795 
796  XrdSysMutexHelper scopedLock( info->mutex );
797 
798  const time_t now = time(0);
799  const bool anySID =
800  info->sidManager->IsAnySIDOldAs( now - streamTimeout );
801 
802  log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
803  "stream timeout: %d, any SID: %d, wait barrier: %s",
804  info->streamName.c_str(), (long long) inactiveTime, streamTimeout,
805  anySID, Utils::TimeToString(info->waitBarrier).c_str() );
806 
807  if( inactiveTime < streamTimeout )
808  return Status();
809 
810  if( now < info->waitBarrier )
811  return Status();
812 
813  if( !anySID )
814  return Status();
815 
816  return Status( stError, errSocketTimeout );
817  }
818 
819  //----------------------------------------------------------------------------
820  // Multiplex
821  //----------------------------------------------------------------------------
823  {
824  return PathID( 0, 0 );
825  }
826 
827  //----------------------------------------------------------------------------
828  // Multiplex
829  //----------------------------------------------------------------------------
831  AnyObject &channelData,
832  PathID *hint )
833  {
834  XRootDChannelInfo *info = 0;
835  channelData.Get( info );
836  XrdSysMutexHelper scopedLock( info->mutex );
837 
838  //--------------------------------------------------------------------------
839  // If we're not connected to a data server or we don't know that yet
840  // we stream through 0
841  //--------------------------------------------------------------------------
842  if( !(info->serverFlags & kXR_isServer) || info->stream.size() == 0 )
843  return PathID( 0, 0 );
844 
845  //--------------------------------------------------------------------------
846  // Select the streams
847  //--------------------------------------------------------------------------
848  Log *log = DefaultEnv::GetLog();
849  uint16_t upStream = 0;
850  uint16_t downStream = 0;
851 
852  if( hint )
853  {
854  upStream = hint->up;
855  downStream = hint->down;
856  }
857  else
858  {
859  upStream = 0;
860  std::vector<bool> connected;
861  connected.reserve( info->stream.size() - 1 );
862  size_t nbConnected = 0;
863  for( size_t i = 1; i < info->stream.size(); ++i )
864  if( info->stream[i].status == XRootDStreamInfo::Connected )
865  {
866  connected.push_back( true );
867  ++nbConnected;
868  }
869  else
870  connected.push_back( false );
871 
872  if( nbConnected == 0 )
873  downStream = 0;
874  else
875  downStream = info->strmSelector->Select( connected );
876  }
877 
878  if( upStream >= info->stream.size() )
879  {
881  "[%s] Up link stream %d does not exist, using 0",
882  info->streamName.c_str(), upStream );
883  upStream = 0;
884  }
885 
886  if( downStream >= info->stream.size() )
887  {
889  "[%s] Down link stream %d does not exist, using 0",
890  info->streamName.c_str(), downStream );
891  downStream = 0;
892  }
893 
894  //--------------------------------------------------------------------------
895  // Modify the message
896  //--------------------------------------------------------------------------
897  UnMarshallRequest( msg );
899  switch( hdr->requestid )
900  {
901  //------------------------------------------------------------------------
902  // Read - we update the path id to tell the server where we want to
903  // get the response, but we still send the request through stream 0
904  // We need to allocate space for read_args if we don't have it
905  // included yet
906  //------------------------------------------------------------------------
907  case kXR_read:
908  {
909  if( msg->GetSize() < sizeof(ClientReadRequest) + 8 )
910  {
911  msg->ReAllocate( sizeof(ClientReadRequest) + 8 );
912  void *newBuf = msg->GetBuffer(sizeof(ClientReadRequest));
913  memset( newBuf, 0, 8 );
915  req->dlen += 8;
916  }
917  read_args *args = (read_args*)msg->GetBuffer(sizeof(ClientReadRequest));
918  args->pathid = info->stream[downStream].pathId;
919  break;
920  }
921 
922 
923  //------------------------------------------------------------------------
924  // PgRead - we update the path id to tell the server where we want to
925  // get the response, but we still send the request through stream 0
926  // We need to allocate space for ClientPgReadReqArgs if we don't have it
927  // included yet
928  //------------------------------------------------------------------------
929  case kXR_pgread:
930  {
931  if( msg->GetSize() < sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) )
932  {
933  msg->ReAllocate( sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) );
934  void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
935  memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
937  req->dlen += sizeof( ClientPgReadReqArgs );
938  }
939  ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
940  msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
941  args->pathid = info->stream[downStream].pathId;
942  break;
943  }
944 
945  //------------------------------------------------------------------------
946  // ReadV - the situation is identical to read but we don't need any
947  // additional structures to specify the return path
948  //------------------------------------------------------------------------
949  case kXR_readv:
950  {
952  req->pathid = info->stream[downStream].pathId;
953  break;
954  }
955 
956  //------------------------------------------------------------------------
957  // Write - multiplexing writes doesn't work properly in the server
958  //------------------------------------------------------------------------
959  case kXR_write:
960  {
961 // ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
962 // req->pathid = info->stream[downStream].pathId;
963  break;
964  }
965 
966  //------------------------------------------------------------------------
967  // WriteV - multiplexing writes doesn't work properly in the server
968  //------------------------------------------------------------------------
969  case kXR_writev:
970  {
971 // ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
972 // req->pathid = info->stream[downStream].pathId;
973  break;
974  }
975 
976  //------------------------------------------------------------------------
977  // PgWrite - multiplexing writes doesn't work properly in the server
978  //------------------------------------------------------------------------
979  case kXR_pgwrite:
980  {
981 // ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
982 // req->pathid = info->stream[downStream].pathId;
983  break;
984  }
985  };
986  MarshallRequest( msg );
987  return PathID( upStream, downStream );
988  }
989 
990  //----------------------------------------------------------------------------
991  // Return a number of substreams per stream that should be created
992  // This depends on the environment and whether we are connected to
993  // a data server or not
994  //----------------------------------------------------------------------------
996  {
997  XRootDChannelInfo *info = 0;
998  channelData.Get( info );
999  XrdSysMutexHelper scopedLock( info->mutex );
1000 
1001  //--------------------------------------------------------------------------
1002  // If the connection has been opened in order to orchestrate a TPC or
1003  // the remote server is a Manager or Metamanager we will need only one
1004  // (control) stream.
1005  //--------------------------------------------------------------------------
1006  if( info->istpc || !(info->serverFlags & kXR_isServer ) ) return 1;
1007 
1008  //--------------------------------------------------------------------------
1009  // Number of streams requested by user
1010  //--------------------------------------------------------------------------
1011  uint16_t ret = info->stream.size();
1012 
1014  int nodata = DefaultTlsNoData;
1015  env->GetInt( "TlsNoData", nodata );
1016 
1017  // Does the server require the stream 0 to be encrypted?
1018  bool srvTlsStrm0 = ( info->serverFlags & kXR_gotoTLS ) ||
1019  ( info->serverFlags & kXR_tlsLogin ) ||
1020  ( info->serverFlags & kXR_tlsSess );
1021  // Does the server NOT require the data streams to be encrypted?
1022  bool srvNoTlsData = !( info->serverFlags & kXR_tlsData );
1023  // Does the user require the stream 0 to be encrypted?
1024  bool usrTlsStrm0 = info->encrypted;
1025  // Does the user NOT require the data streams to be encrypted?
1026  bool usrNoTlsData = !info->encrypted || ( info->encrypted && nodata );
1027 
1028  if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1029  ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1030  {
1031  //------------------------------------------------------------------------
1032  // The server or user asked us to encrypt stream 0, but to send the data
1033  // (read/write) using a plain TCP connection
1034  //------------------------------------------------------------------------
1035  if( ret == 1 ) ++ret;
1036  }
1037 
1038  if( ret > info->stream.size() )
1039  {
1040  info->stream.resize( ret );
1041  info->strmSelector->AdjustQueues( ret );
1042  }
1043 
1044  return ret;
1045  }
1046 
1047  //----------------------------------------------------------------------------
1048  // Marshall
1049  //----------------------------------------------------------------------------
1051  {
1052  ClientRequest *req = (ClientRequest*)msg;
1053  switch( req->header.requestid )
1054  {
1055  //------------------------------------------------------------------------
1056  // kXR_protocol
1057  //------------------------------------------------------------------------
1058  case kXR_protocol:
1059  req->protocol.clientpv = htonl( req->protocol.clientpv );
1060  break;
1061 
1062  //------------------------------------------------------------------------
1063  // kXR_login
1064  //------------------------------------------------------------------------
1065  case kXR_login:
1066  req->login.pid = htonl( req->login.pid );
1067  break;
1068 
1069  //------------------------------------------------------------------------
1070  // kXR_locate
1071  //------------------------------------------------------------------------
1072  case kXR_locate:
1073  req->locate.options = htons( req->locate.options );
1074  break;
1075 
1076  //------------------------------------------------------------------------
1077  // kXR_query
1078  //------------------------------------------------------------------------
1079  case kXR_query:
1080  req->query.infotype = htons( req->query.infotype );
1081  break;
1082 
1083  //------------------------------------------------------------------------
1084  // kXR_truncate
1085  //------------------------------------------------------------------------
1086  case kXR_truncate:
1087  req->truncate.offset = htonll( req->truncate.offset );
1088  break;
1089 
1090  //------------------------------------------------------------------------
1091  // kXR_mkdir
1092  //------------------------------------------------------------------------
1093  case kXR_mkdir:
1094  req->mkdir.mode = htons( req->mkdir.mode );
1095  break;
1096 
1097  //------------------------------------------------------------------------
1098  // kXR_chmod
1099  //------------------------------------------------------------------------
1100  case kXR_chmod:
1101  req->chmod.mode = htons( req->chmod.mode );
1102  break;
1103 
1104  //------------------------------------------------------------------------
1105  // kXR_open
1106  //------------------------------------------------------------------------
1107  case kXR_open:
1108  req->open.mode = htons( req->open.mode );
1109  req->open.options = htons( req->open.options );
1110  break;
1111 
1112  //------------------------------------------------------------------------
1113  // kXR_read
1114  //------------------------------------------------------------------------
1115  case kXR_read:
1116  req->read.offset = htonll( req->read.offset );
1117  req->read.rlen = htonl( req->read.rlen );
1118  break;
1119 
1120  //------------------------------------------------------------------------
1121  // kXR_write
1122  //------------------------------------------------------------------------
1123  case kXR_write:
1124  req->write.offset = htonll( req->write.offset );
1125  break;
1126 
1127  //------------------------------------------------------------------------
1128  // kXR_mv
1129  //------------------------------------------------------------------------
1130  case kXR_mv:
1131  req->mv.arg1len = htons( req->mv.arg1len );
1132  break;
1133 
1134  //------------------------------------------------------------------------
1135  // kXR_readv
1136  //------------------------------------------------------------------------
1137  case kXR_readv:
1138  {
1139  uint16_t numChunks = (req->readv.dlen)/16;
1140  readahead_list *dataChunk = (readahead_list*)( msg + 24 );
1141  for( size_t i = 0; i < numChunks; ++i )
1142  {
1143  dataChunk[i].rlen = htonl( dataChunk[i].rlen );
1144  dataChunk[i].offset = htonll( dataChunk[i].offset );
1145  }
1146  break;
1147  }
1148 
1149  //------------------------------------------------------------------------
1150  // kXR_writev
1151  //------------------------------------------------------------------------
1152  case kXR_writev:
1153  {
1154  uint16_t numChunks = (req->writev.dlen)/16;
1155  XrdProto::write_list *wrtList =
1156  reinterpret_cast<XrdProto::write_list*>( msg + 24 );
1157  for( size_t i = 0; i < numChunks; ++i )
1158  {
1159  wrtList[i].wlen = htonl( wrtList[i].wlen );
1160  wrtList[i].offset = htonll( wrtList[i].offset );
1161  }
1162 
1163  break;
1164  }
1165 
1166  case kXR_pgread:
1167  {
1168  req->pgread.offset = htonll( req->pgread.offset );
1169  req->pgread.rlen = htonl( req->pgread.rlen );
1170  break;
1171  }
1172 
1173  case kXR_pgwrite:
1174  {
1175  req->pgwrite.offset = htonll( req->pgwrite.offset );
1176  break;
1177  }
1178 
1179  //------------------------------------------------------------------------
1180  // kXR_prepare
1181  //------------------------------------------------------------------------
1182  case kXR_prepare:
1183  {
1184  req->prepare.optionX = htons( req->prepare.optionX );
1185  req->prepare.port = htons( req->prepare.port );
1186  break;
1187  }
1188 
1189  case kXR_chkpoint:
1190  {
1191  if( req->chkpoint.opcode == kXR_ckpXeq )
1192  MarshallRequest( msg + 24 );
1193  break;
1194  }
1195  };
1196 
1197  req->header.requestid = htons( req->header.requestid );
1198  req->header.dlen = htonl( req->header.dlen );
1199  return XRootDStatus();
1200  }
1201 
1202  //----------------------------------------------------------------------------
1203  // Unmarshall the request - sometimes the requests need to be rewritten,
1204  // so we need to unmarshall them
1205  //----------------------------------------------------------------------------
1207  {
1208  if( !msg->IsMarshalled() ) return XRootDStatus( stOK, suAlreadyDone );
1209  // We rely on the marshaling process to be symmetric!
1210  // First we unmarshall the request ID and the length because
1211  // MarshallRequest() relies on these, and then we need to unmarshall these
1212  // two again, because they get marshalled in MarshallRequest().
1213  // All this is pretty damn ugly and should be rewritten.
1214  ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1215  req->header.requestid = htons( req->header.requestid );
1216  req->header.dlen = htonl( req->header.dlen );
1217  XRootDStatus st = MarshallRequest( msg );
1218  req->header.requestid = htons( req->header.requestid );
1219  req->header.dlen = htonl( req->header.dlen );
1220  msg->SetIsMarshalled( false );
1221  return st;
1222  }
1223 
1224  //----------------------------------------------------------------------------
1225  // Unmarshall the body of the incoming message
1226  //----------------------------------------------------------------------------
1228  {
1229  ServerResponse *m = (ServerResponse *)msg->GetBuffer();
1230 
1231  //--------------------------------------------------------------------------
1232  // kXR_ok
1233  //--------------------------------------------------------------------------
1234  if( m->hdr.status == kXR_ok )
1235  {
1236  switch( reqType )
1237  {
1238  //----------------------------------------------------------------------
1239  // kXR_protocol
1240  //----------------------------------------------------------------------
1241  case kXR_protocol:
1242  if( m->hdr.dlen < 8 )
1243  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_protocol: body too short." );
1244  m->body.protocol.pval = ntohl( m->body.protocol.pval );
1245  m->body.protocol.flags = ntohl( m->body.protocol.flags );
1246  break;
1247  }
1248  }
1249  //--------------------------------------------------------------------------
1250  // kXR_error
1251  //--------------------------------------------------------------------------
1252  else if( m->hdr.status == kXR_error )
1253  {
1254  if( m->hdr.dlen < 4 )
1255  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_error: body too short." );
1256  m->body.error.errnum = ntohl( m->body.error.errnum );
1257  }
1258 
1259  //--------------------------------------------------------------------------
1260  // kXR_wait
1261  //--------------------------------------------------------------------------
1262  else if( m->hdr.status == kXR_wait )
1263  {
1264  if( m->hdr.dlen < 4 )
1265  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_wait: body too short." );
1266  m->body.wait.seconds = htonl( m->body.wait.seconds );
1267  }
1268 
1269  //--------------------------------------------------------------------------
1270  // kXR_redirect
1271  //--------------------------------------------------------------------------
1272  else if( m->hdr.status == kXR_redirect )
1273  {
1274  if( m->hdr.dlen < 4 )
1275  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_redirect: body too short." );
1276  m->body.redirect.port = htonl( m->body.redirect.port );
1277  }
1278 
1279  //--------------------------------------------------------------------------
1280  // kXR_waitresp
1281  //--------------------------------------------------------------------------
1282  else if( m->hdr.status == kXR_waitresp )
1283  {
1284  if( m->hdr.dlen < 4 )
1285  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_waitresp: body too short." );
1286  m->body.waitresp.seconds = htonl( m->body.waitresp.seconds );
1287  }
1288 
1289  //--------------------------------------------------------------------------
1290  // kXR_attn
1291  //--------------------------------------------------------------------------
1292  else if( m->hdr.status == kXR_attn )
1293  {
1294  if( m->hdr.dlen < 4 )
1295  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_attn: body too short." );
1296  m->body.attn.actnum = htonl( m->body.attn.actnum );
1297  }
1298 
1299  return XRootDStatus();
1300  }
1301 
1302  //------------------------------------------------------------------------
1304  //------------------------------------------------------------------------
1306  {
1307  //--------------------------------------------------------------------------
1308  // Calculate the crc32c before the unmarshaling the body!
1309  //--------------------------------------------------------------------------
1311  char *buffer = msg.GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
1312  size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
1313  uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1314 
1315  size_t stlen = sizeof( ServerResponseStatus );
1316  switch( reqType )
1317  {
1318  case kXR_pgread:
1319  {
1320  stlen += sizeof( ServerResponseBody_pgRead );
1321  break;
1322  }
1323 
1324  case kXR_pgwrite:
1325  {
1326  stlen += sizeof( ServerResponseBody_pgWrite );
1327  break;
1328  }
1329  }
1330 
1331  if( msg.GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
1332  "kXR_status: invalid message size." );
1333 
1334  rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
1335  rspst->bdy.dlen = ntohl( rspst->bdy.dlen );
1336 
1337  switch( reqType )
1338  {
1339  case kXR_pgread:
1340  {
1342  pgrdbdy->offset = ntohll( pgrdbdy->offset );
1343  break;
1344  }
1345 
1346  case kXR_pgwrite:
1347  {
1349  pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
1350  break;
1351  }
1352  }
1353 
1354  //--------------------------------------------------------------------------
1355  // Do the integrity checks
1356  //--------------------------------------------------------------------------
1357  if( crcval != rspst->bdy.crc32c )
1358  {
1359  return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1360  "corrupted (crc32c integrity check failed)." );
1361  }
1362 
1363  if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
1364  rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
1365  {
1366  return XRootDStatus( stError, errDataError, 0, "response header corrupted "
1367  "(stream ID mismatch)." );
1368  }
1369 
1370 
1371 
1372  if( rspst->bdy.requestid + kXR_1stRequest != reqType )
1373  {
1374  return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
1375  "(request ID mismatch)." );
1376  }
1377 
1378  return XRootDStatus();
1379  }
1380 
1382  {
1384  uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest;
1385 
1386  switch( reqType )
1387  {
1388  case kXR_pgwrite:
1389  {
1390  //--------------------------------------------------------------------------
1391  // If there's no additional data there's nothing to unmarshal
1392  //--------------------------------------------------------------------------
1393  if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
1394  //--------------------------------------------------------------------------
1395  // If there's not enough data to form correction-segment report an error
1396  //--------------------------------------------------------------------------
1397  if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
1399  "kXR_status: invalid message size." );
1400 
1401  //--------------------------------------------------------------------------
1402  // Calculate the crc32c for the additional data
1403  //--------------------------------------------------------------------------
1405  cse->cseCRC = ntohl( cse->cseCRC );
1406  size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
1407  void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
1408  uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1409 
1410  //--------------------------------------------------------------------------
1411  // Do the integrity checks
1412  //--------------------------------------------------------------------------
1413  if( crcval != cse->cseCRC )
1414  {
1415  return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1416  "corrupted (crc32c integrity check failed)." );
1417  }
1418 
1419  cse->dlFirst = ntohs( cse->dlFirst );
1420  cse->dlLast = ntohs( cse->dlLast );
1421 
1422  size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
1423  sizeof( kXR_int64 );
1424  kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) +
1425  sizeof( ServerResponseBody_pgWrCSE ) );
1426 
1427  for( size_t i = 0; i < pgcnt; ++i )
1428  pgoffs[i] = ntohll( pgoffs[i] );
1429 
1430  return XRootDStatus();
1431  break;
1432  }
1433 
1434  default:
1435  break;
1436  }
1437 
1439  }
1440 
1441  //----------------------------------------------------------------------------
1442  // Unmarshall the header of the incoming message
1443  //----------------------------------------------------------------------------
1445  {
1447  header->status = ntohs( header->status );
1448  header->dlen = ntohl( header->dlen );
1449  }
1450 
1451  //----------------------------------------------------------------------------
1452  // Log server error response
1453  //----------------------------------------------------------------------------
1455  {
1456  Log *log = DefaultEnv::GetLog();
1457  ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
1458  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
1459  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
1460  log->Error( XRootDTransportMsg, "Server responded with an error [%d]: %s",
1461  rsp->body.error.errnum, errmsg );
1462  delete [] errmsg;
1463  }
1464 
1465  //------------------------------------------------------------------------
1466  // Number of currently connected data streams
1467  //------------------------------------------------------------------------
1469  {
1470  XRootDChannelInfo *info = 0;
1471  channelData.Get( info );
1472  XrdSysMutexHelper scopedLock( info->mutex );
1473 
1474  uint16_t nbConnected = 0;
1475  for( size_t i = 1; i < info->stream.size(); ++i )
1476  if( info->stream[i].status == XRootDStreamInfo::Connected )
1477  ++nbConnected;
1478 
1479  return nbConnected;
1480  }
1481 
1482  //----------------------------------------------------------------------------
1483  // The stream has been disconnected, do the cleanups
1484  //----------------------------------------------------------------------------
1486  uint16_t subStreamId )
1487  {
1488  XRootDChannelInfo *info = 0;
1489  channelData.Get( info );
1490  XrdSysMutexHelper scopedLock( info->mutex );
1491 
1492  CleanUpProtection( info );
1493 
1494  if( !info->stream.empty() )
1495  {
1496  XRootDStreamInfo &sInfo = info->stream[subStreamId];
1498  }
1499 
1500  if( subStreamId == 0 )
1501  {
1502  info->sidManager->ReleaseAllTimedOut();
1503  info->sentOpens.clear();
1504  info->sentCloses.clear();
1505  info->openFiles = 0;
1506  info->waitBarrier = 0;
1507  }
1508  }
1509 
1510  //------------------------------------------------------------------------
1511  // Query the channel
1512  //------------------------------------------------------------------------
1514  AnyObject &result,
1515  AnyObject &channelData )
1516  {
1517  XRootDChannelInfo *info = 0;
1518  channelData.Get( info );
1519  XrdSysMutexHelper scopedLock( info->mutex );
1520 
1521  switch( query )
1522  {
1523  //------------------------------------------------------------------------
1524  // Protocol name
1525  //------------------------------------------------------------------------
1526  case TransportQuery::Name:
1527  result.Set( (const char*)"XRootD", false );
1528  return Status();
1529 
1530  //------------------------------------------------------------------------
1531  // Authentication
1532  //------------------------------------------------------------------------
1533  case TransportQuery::Auth:
1534  result.Set( new std::string( info->authProtocolName ), false );
1535  return Status();
1536 
1537  //------------------------------------------------------------------------
1538  // Server flags
1539  //------------------------------------------------------------------------
1541  result.Set( new int( info->serverFlags ), false );
1542  return Status();
1543 
1544  //------------------------------------------------------------------------
1545  // Protocol version
1546  //------------------------------------------------------------------------
1548  result.Set( new int( info->protocolVersion ), false );
1549  return Status();
1550 
1552  result.Set( new bool( info->encrypted ), false );
1553  return Status();
1554  };
1555  return Status( stError, errQueryNotSupported );
1556  }
1557 
1558  //----------------------------------------------------------------------------
1559  // Check whether the transport can hijack the message
1560  //----------------------------------------------------------------------------
1562  uint16_t subStream,
1563  AnyObject &channelData )
1564  {
1565  XRootDChannelInfo *info = 0;
1566  channelData.Get( info );
1567  XrdSysMutexHelper scopedLock( info->mutex );
1568  Log *log = DefaultEnv::GetLog();
1569 
1570  //--------------------------------------------------------------------------
1571  // Update the substream queues
1572  //--------------------------------------------------------------------------
1573  info->strmSelector->MsgReceived( subStream );
1574 
1575  //--------------------------------------------------------------------------
1576  // Check whether this message is a response to a request that has
1577  // timed out, and if so, drop it
1578  //--------------------------------------------------------------------------
1579  ServerResponse *rsp = (ServerResponse*)msg.GetBuffer();
1580  if( rsp->hdr.status == kXR_attn )
1581  {
1582  return NoAction;
1583  }
1584 
1585  if( info->sidManager->IsTimedOut( rsp->hdr.streamid ) )
1586  {
1587  log->Error( XRootDTransportMsg, "Message %p, stream [%d, %d] is a "
1588  "response that we're no longer interested in (timed out)",
1589  &msg, rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
1590  //------------------------------------------------------------------------
1591  // If it is kXR_waitresp there will be another one,
1592  // so we don't release the sid yet
1593  //------------------------------------------------------------------------
1594  if( rsp->hdr.status != kXR_waitresp )
1595  info->sidManager->ReleaseTimedOut( rsp->hdr.streamid );
1596  //------------------------------------------------------------------------
1597  // If it is a successful response to an open request
1598  // that timed out, we need to send a close
1599  //------------------------------------------------------------------------
1600  uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1601  std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1602  if( sidIt != info->sentOpens.end() )
1603  {
1604  info->sentOpens.erase( sidIt );
1605  if( rsp->hdr.status == kXR_ok ) return RequestClose;
1606  }
1607  return DigestMsg;
1608  }
1609 
1610  //--------------------------------------------------------------------------
1611  // If we have a wait or waitresp
1612  //--------------------------------------------------------------------------
1613  uint32_t seconds = 0;
1614  if( rsp->hdr.status == kXR_wait )
1615  seconds = ntohl( rsp->body.wait.seconds ) + 5; // we need extra time
1616  // to re-send the request
1617  else if( rsp->hdr.status == kXR_waitresp )
1618  {
1619  seconds = ntohl( rsp->body.waitresp.seconds );
1620 
1621  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %u seconds, "
1622  "setting up wait barrier.",
1623  info->streamName.c_str(),
1624  seconds );
1625  }
1626 
1627  time_t barrier = time(0) + seconds;
1628  if( info->waitBarrier < barrier )
1629  info->waitBarrier = barrier;
1630 
1631  //--------------------------------------------------------------------------
1632  // If we got a response to an open request, we may need to bump the counter
1633  // of open files
1634  //--------------------------------------------------------------------------
1635  uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1636  std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1637  if( sidIt != info->sentOpens.end() )
1638  {
1639  if( rsp->hdr.status == kXR_waitresp )
1640  return NoAction;
1641  info->sentOpens.erase( sidIt );
1642  if( rsp->hdr.status == kXR_ok )
1643  {
1644  ++info->openFiles;
1645  info->finstcnt.fetch_add( 1, std::memory_order_relaxed ); // another file File object instance has been bound with this connection
1646  }
1647  return NoAction;
1648  }
1649 
1650  //--------------------------------------------------------------------------
1651  // If we got a response to a close, we may need to decrement the counter of
1652  // open files
1653  //--------------------------------------------------------------------------
1654  sidIt = info->sentCloses.find( sid );
1655  if( sidIt != info->sentCloses.end() )
1656  {
1657  if( rsp->hdr.status == kXR_waitresp )
1658  return NoAction;
1659  info->sentCloses.erase( sidIt );
1660  --info->openFiles;
1661  return NoAction;
1662  }
1663  return NoAction;
1664  }
1665 
1666  //----------------------------------------------------------------------------
1667  // Notify the transport about a message having been sent
1668  //----------------------------------------------------------------------------
1670  uint16_t subStream,
1671  uint32_t bytesSent,
1672  AnyObject &channelData )
1673  {
1674  // Called when a message has been sent. For messages that return on a
1675  // different pathid (and hence may use a different poller) it is possible
1676  // that the server has already replied and the reply will trigger
1677  // MessageReceived() before this method has been called. However for open
1678  // and close this is never the case and this method is used for tracking
1679  // only those.
1680  XRootDChannelInfo *info = 0;
1681  channelData.Get( info );
1682  XrdSysMutexHelper scopedLock( info->mutex );
1683  ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1684  uint16_t reqid = ntohs( req->header.requestid );
1685 
1686 
1687  //--------------------------------------------------------------------------
1688  // We need to track opens to know if we can close streams due to idleness
1689  //--------------------------------------------------------------------------
1690  uint16_t sid;
1691  memcpy( &sid, req->header.streamid, 2 );
1692 
1693  if( reqid == kXR_open )
1694  info->sentOpens.insert( sid );
1695  else if( reqid == kXR_close )
1696  info->sentCloses.insert( sid );
1697  }
1698 
1699 
1700  //----------------------------------------------------------------------------
1701  // Get signature for given message
1702  //----------------------------------------------------------------------------
1704  {
1705  XRootDChannelInfo *info = 0;
1706  channelData.Get( info );
1707  return GetSignature( toSign, sign, info );
1708  }
1709 
1710  //------------------------------------------------------------------------
1712  //------------------------------------------------------------------------
1714  Message *&sign,
1715  XRootDChannelInfo *info )
1716  {
1717  XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
1718  if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
1719 
1720  ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
1721  if( !info ) return Status( stError, errInternal );
1722  if( info->protection )
1723  {
1724  SecurityRequest *newreq = 0;
1725  // check if we have to secure the request in the first place
1726  if( !( NEED2SECURE ( info->protection )( *thereq ) ) ) return Status();
1727  // secure (sign/encrypt) the request
1728  int rc = info->protection->Secure( newreq, *thereq, 0 );
1729  // there was an error
1730  if( rc < 0 )
1731  return Status( stError, errInternal, -rc );
1732 
1733  sign = new Message();
1734  sign->Grab( reinterpret_cast<char*>( newreq ), rc );
1735  }
1736 
1737  return Status();
1738  }
1739 
1740  //------------------------------------------------------------------------
1742  //------------------------------------------------------------------------
1744  {
1745  XRootDChannelInfo *info = 0;
1746  channelData.Get( info );
1747  if( info->finstcnt.load( std::memory_order_relaxed ) > 0 )
1748  info->finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1749  }
1750 
1751  //----------------------------------------------------------------------------
1752  // Wait before exit
1753  //----------------------------------------------------------------------------
1755  {
1756  XrdSysRWLockHelper scope( pSecUnloadHandler->lock, false ); // obtain write lock
1757  pSecUnloadHandler->unloaded = true;
1758  }
1759 
1760  //----------------------------------------------------------------------------
1761  // @return : true if encryption should be turned on, false otherwise
1762  //----------------------------------------------------------------------------
1764  AnyObject &channelData )
1765  {
1766  XRootDChannelInfo *info = 0;
1767  channelData.Get( info );
1768 
1770  int notlsok = DefaultNoTlsOK;
1771  env->GetInt( "NoTlsOK", notlsok );
1772 
1773  if( notlsok )
1774  return info->encrypted;
1775 
1776  // Did the server instructed us to switch to TLS right away?
1777  if( info->serverFlags & kXR_gotoTLS )
1778  {
1779  info->encrypted = true;
1780  return true ;
1781  }
1782 
1783  XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
1784 
1785  //--------------------------------------------------------------------------
1786  // The control stream (sub-stream 0) might need to switch to TLS before
1787  // login or after login
1788  //--------------------------------------------------------------------------
1789  if( handShakeData->subStreamId == 0 )
1790  {
1791  //------------------------------------------------------------------------
1792  // We are about to login and the server asked to start encrypting
1793  // before login
1794  //------------------------------------------------------------------------
1795  if( ( sInfo.status == XRootDStreamInfo::LoginSent ) &&
1796  ( info->serverFlags & kXR_tlsLogin ) )
1797  {
1798  info->encrypted = true;
1799  return true;
1800  }
1801 
1802  //--------------------------------------------------------------------
1803  // The hand-shake is done and the server requested to encrypt the session
1804  //--------------------------------------------------------------------
1805  if( (sInfo.status == XRootDStreamInfo::Connected ||
1806  //--------------------------------------------------------------------
1807  // we really need to turn on TLS before we sent kXR_endsess and we
1808  // are about to do so (1st enable encryption, then send kXR_endsess)
1809  //--------------------------------------------------------------------
1811  ( info->serverFlags & kXR_tlsSess ) )
1812  {
1813  info->encrypted = true;
1814  return true;
1815  }
1816  }
1817  //--------------------------------------------------------------------------
1818  // A data stream (sub-stream > 0) if need be will be switched to TLS before
1819  // bind.
1820  //--------------------------------------------------------------------------
1821  else
1822  {
1823  //------------------------------------------------------------------------
1824  // We are about to bind a data stream and the server asked to start
1825  // encrypting before bind
1826  //------------------------------------------------------------------------
1827  if( ( sInfo.status == XRootDStreamInfo::BindSent ) &&
1828  ( info->serverFlags & kXR_tlsData ) )
1829  {
1830  info->encrypted = true;
1831  return true;
1832  }
1833  }
1834 
1835  return false;
1836  }
1837 
1838  //------------------------------------------------------------------------
1839  // Get bind preference for the next data stream
1840  //------------------------------------------------------------------------
1842  AnyObject &channelData )
1843  {
1844  XRootDChannelInfo *info = 0;
1845  channelData.Get( info );
1846  if( !bool( info->bindSelector ) )
1847  return url;
1848 
1849  return URL( info->bindSelector->Get() );
1850  }
1851 
1852  //----------------------------------------------------------------------------
1853  // Generate the message to be sent as an initial handshake
1854  // (handshake+kXR_protocol)
1855  //----------------------------------------------------------------------------
1856  Message *XRootDTransport::GenerateInitialHSProtocol( HandShakeData *hsData,
1857  XRootDChannelInfo *info,
1858  kXR_char expect )
1859  {
1860  Log *log = DefaultEnv::GetLog();
1861  log->Debug( XRootDTransportMsg,
1862  "[%s] Sending out the initial hand shake + kXR_protocol",
1863  hsData->streamName.c_str() );
1864 
1865  Message *msg = new Message();
1866 
1867  msg->Allocate( 20+sizeof(ClientProtocolRequest) );
1868  msg->Zero();
1869 
1871  init->fourth = htonl(4);
1872  init->fifth = htonl(2012);
1873 
1875  InitProtocolReq( proto, info, expect );
1876 
1877  return msg;
1878  }
1879 
1880  //------------------------------------------------------------------------
1881  // Generate the protocol message
1882  //------------------------------------------------------------------------
1883  Message *XRootDTransport::GenerateProtocol( HandShakeData *hsData,
1884  XRootDChannelInfo *info,
1885  kXR_char expect )
1886  {
1887  Log *log = DefaultEnv::GetLog();
1888  log->Debug( XRootDTransportMsg,
1889  "[%s] Sending out the kXR_protocol",
1890  hsData->streamName.c_str() );
1891 
1892  Message *msg = new Message();
1893  msg->Allocate( sizeof(ClientProtocolRequest) );
1894  msg->Zero();
1895 
1896  ClientProtocolRequest *proto = (ClientProtocolRequest *)msg->GetBuffer();
1897  InitProtocolReq( proto, info, expect );
1898 
1899  return msg;
1900  }
1901 
1902  //------------------------------------------------------------------------
1903  // Initialize protocol request
1904  //------------------------------------------------------------------------
1905  void XRootDTransport::InitProtocolReq( ClientProtocolRequest *request,
1906  XRootDChannelInfo *info,
1907  kXR_char expect )
1908  {
1909  request->requestid = htons(kXR_protocol);
1910  request->clientpv = htonl(kXR_PROTOCOLVERSION);
1913 
1914  int notlsok = DefaultNoTlsOK;
1915  int tlsnodata = DefaultTlsNoData;
1916 
1918 
1919  env->GetInt( "NoTlsOK", notlsok );
1920 
1922  env->GetInt( "TlsNoData", tlsnodata );
1923 
1924  if (info->encrypted || InitTLS())
1926 
1927  if (info->encrypted && !(notlsok || tlsnodata))
1929 
1930  request->expect = expect;
1931 
1932  //--------------------------------------------------------------------------
1933  // If we are in the curse of establishing a connection in the context of
1934  // TPC update the expect! (this will be never followed be a bind)
1935  //--------------------------------------------------------------------------
1936  if( info->istpc )
1938  }
1939 
1940  //----------------------------------------------------------------------------
1941  // Process the server initial handshake response
1942  //----------------------------------------------------------------------------
1943  XRootDStatus XRootDTransport::ProcessServerHS( HandShakeData *hsData,
1944  XRootDChannelInfo *info )
1945  {
1946  Log *log = DefaultEnv::GetLog();
1947 
1948  Message *msg = hsData->in;
1949  ServerResponseHeader *respHdr = (ServerResponseHeader *)msg->GetBuffer();
1950  ServerInitHandShake *hs = (ServerInitHandShake *)msg->GetBuffer(4);
1951 
1952  if( respHdr->status != kXR_ok )
1953  {
1954  log->Error( XRootDTransportMsg, "[%s] Invalid hand shake response",
1955  hsData->streamName.c_str() );
1956 
1957  return XRootDStatus( stFatal, errHandShakeFailed, 0, "Invalid hand shake response." );
1958  }
1959 
1960  info->protocolVersion = ntohl(hs->protover);
1961  info->serverFlags = ntohl(hs->msgval) == kXR_DataServer ?
1962  kXR_isServer:
1963  kXR_isManager;
1964 
1965  log->Debug( XRootDTransportMsg,
1966  "[%s] Got the server hand shake response (%s, protocol "
1967  "version %x)",
1968  hsData->streamName.c_str(),
1969  ServerFlagsToStr( info->serverFlags ).c_str(),
1970  info->protocolVersion );
1971 
1972  return XRootDStatus( stOK, suContinue );
1973  }
1974 
1975  //----------------------------------------------------------------------------
1976  // Process the protocol response
1977  //----------------------------------------------------------------------------
1978  XRootDStatus XRootDTransport::ProcessProtocolResp( HandShakeData *hsData,
1979  XRootDChannelInfo *info )
1980  {
1981  Log *log = DefaultEnv::GetLog();
1982 
1983  XRootDStatus st = UnMarshallBody( hsData->in, kXR_protocol );
1984  if( !st.IsOK() )
1985  return st;
1986 
1987  ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
1988 
1989 
1990  if( rsp->hdr.status != kXR_ok )
1991  {
1992  log->Error( XRootDTransportMsg, "[%s] kXR_protocol request failed",
1993  hsData->streamName.c_str() );
1994 
1995  return XRootDStatus( stFatal, errHandShakeFailed, 0, "kXR_protocol request failed" );
1996  }
1997 
1999  int notlsok = DefaultNoTlsOK;
2000  env->GetInt( "NoTlsOK", notlsok );
2001 
2002  if( rsp->body.protocol.pval < kXR_PROTTLSVERSION && info->encrypted )
2003  {
2004  //------------------------------------------------------------------------
2005  // User requested an encrypted connection but the server is to old to
2006  // support it!
2007  //------------------------------------------------------------------------
2008  if( !notlsok ) return XRootDStatus( stFatal, errTlsError, ENOTSUP, "TLS not supported" );
2009 
2010  //------------------------------------------------------------------------
2011  // We are falling back to unencrypted data transmission, as configured
2012  // in XRD_NOTLSOK environment variable
2013  //------------------------------------------------------------------------
2014  log->Info( XRootDTransportMsg,
2015  "[%s] Falling back to unencrypted transmission, server does "
2016  "not support TLS encryption.",
2017  hsData->streamName.c_str() );
2018  info->encrypted = false;
2019  }
2020 
2021  if( rsp->body.protocol.pval >= 0x297 )
2022  info->serverFlags = rsp->body.protocol.flags;
2023 
2024  if( rsp->hdr.dlen > 8 )
2025  {
2026  info->protRespBody = new ServerResponseBody_Protocol();
2027  info->protRespBody->flags = rsp->body.protocol.flags;
2028  info->protRespBody->pval = rsp->body.protocol.pval;
2029 
2030  char* bodybuff = reinterpret_cast<char*>( &rsp->body.protocol.secreq );
2031  size_t bodysize = rsp->hdr.dlen - 8;
2032  XRootDStatus st = ProcessProtocolBody( bodybuff, bodysize, info );
2033  if( !st.IsOK() )
2034  return st;
2035  }
2036 
2037  log->Debug( XRootDTransportMsg,
2038  "[%s] kXR_protocol successful (%s, protocol version %x)",
2039  hsData->streamName.c_str(),
2040  ServerFlagsToStr( info->serverFlags ).c_str(),
2041  info->protocolVersion );
2042 
2043  if( !( info->serverFlags & kXR_haveTLS ) && info->encrypted )
2044  {
2045  //------------------------------------------------------------------------
2046  // User requested an encrypted connection but the server was not configured
2047  // to support encryption!
2048  //------------------------------------------------------------------------
2049  return XRootDStatus( stFatal, errTlsError, ECONNREFUSED,
2050  "Server was not configured to support encryption." );
2051  }
2052 
2053  //--------------------------------------------------------------------------
2054  // Now see if we have to enforce encryption in case the server does not
2055  // support PgRead/PgWrite
2056  //--------------------------------------------------------------------------
2057  int tlsOnNoPgrw = DefaultWantTlsOnNoPgrw;
2058  env->GetInt( "WantTlsOnNoPgrw", tlsOnNoPgrw );
2059  if( !( info->serverFlags & kXR_suppgrw ) && tlsOnNoPgrw )
2060  {
2061  //------------------------------------------------------------------------
2062  // If user requested encryption just make sure it is not switched off for
2063  // data
2064  //------------------------------------------------------------------------
2065  if( info->encrypted )
2066  {
2067  log->Debug( XRootDTransportMsg,
2068  "[%s] Server does not support PgRead/PgWrite and"
2069  " WantTlsOnNoPgrw is on; enforcing encryption for data.",
2070  hsData->streamName.c_str() );
2071  env->PutInt( "TlsNoData", DefaultTlsNoData );
2072  }
2073  //------------------------------------------------------------------------
2074  // Otherwise, if server is not enforcing data encryption, we will need to
2075  // redo the protocol request with kXR_wantTLS set.
2076  //------------------------------------------------------------------------
2077  else if( !( info->serverFlags & kXR_tlsData ) &&
2078  ( info->serverFlags & kXR_haveTLS ) )
2079  {
2080  info->encrypted = true;
2081  return XRootDStatus( stOK, suRetry );
2082  }
2083  }
2084 
2085  return XRootDStatus( stOK, suContinue );
2086  }
2087 
2088  XRootDStatus XRootDTransport::ProcessProtocolBody( char *bodybuff,
2089  size_t bodysize,
2090  XRootDChannelInfo *info )
2091  {
2092  //--------------------------------------------------------------------------
2093  // Parse bind preferences
2094  //--------------------------------------------------------------------------
2095  XrdProto::bifReqs *bifreq = reinterpret_cast<XrdProto::bifReqs*>( bodybuff );
2096  if( bodysize >= sizeof( XrdProto::bifReqs ) && bifreq->theTag == 'B' )
2097  {
2098  bodybuff += sizeof( XrdProto::bifReqs );
2099  bodysize -= sizeof( XrdProto::bifReqs );
2100 
2101  if( bodysize < bifreq->bifILen )
2102  return XRootDStatus( stError, errDataError, 0, "Received incomplete "
2103  "protocol response." );
2104  std::string bindprefs_str( bodybuff, bifreq->bifILen );
2105  std::vector<std::string> bindprefs;
2106  Utils::splitString( bindprefs, bindprefs_str, "," );
2107  info->bindSelector.reset( new BindPrefSelector( std::move( bindprefs ) ) );
2108  bodybuff += bifreq->bifILen;
2109  bodysize -= bifreq->bifILen;
2110  }
2111  //--------------------------------------------------------------------------
2112  // Parse security requirements
2113  //--------------------------------------------------------------------------
2114  XrdProto::secReqs *secreq = reinterpret_cast<XrdProto::secReqs*>( bodybuff );
2115  if( bodysize >= 6 /*XrdProto::secReqs*/ && secreq->theTag == 'S' )
2116  {
2117  memcpy( &info->protRespBody->secreq, secreq, bodysize );
2118  info->protRespSize = bodysize + 8 /*pval & flags*/;
2119  }
2120 
2121  return XRootDStatus();
2122  }
2123 
2124  //----------------------------------------------------------------------------
2125  // Generate the bind message
2126  //----------------------------------------------------------------------------
2127  Message *XRootDTransport::GenerateBind( HandShakeData *hsData,
2128  XRootDChannelInfo *info )
2129  {
2130  Log *log = DefaultEnv::GetLog();
2131 
2132  log->Debug( XRootDTransportMsg,
2133  "[%s] Sending out the bind request",
2134  hsData->streamName.c_str() );
2135 
2136 
2137  Message *msg = new Message( sizeof( ClientBindRequest ) );
2138  ClientBindRequest *bindReq = (ClientBindRequest *)msg->GetBuffer();
2139 
2140  bindReq->requestid = kXR_bind;
2141  memcpy( bindReq->sessid, info->sessionId, 16 );
2142  bindReq->dlen = 0;
2143  MarshallRequest( msg );
2144  return msg;
2145  }
2146 
2147  //----------------------------------------------------------------------------
2148  // Generate the bind message
2149  //----------------------------------------------------------------------------
2150  XRootDStatus XRootDTransport::ProcessBindResp( HandShakeData *hsData,
2151  XRootDChannelInfo *info )
2152  {
2153  Log *log = DefaultEnv::GetLog();
2154 
2155  XRootDStatus st = UnMarshallBody( hsData->in, kXR_bind );
2156  if( !st.IsOK() )
2157  return st;
2158 
2159  ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2160 
2161  if( rsp->hdr.status != kXR_ok )
2162  {
2163  log->Error( XRootDTransportMsg, "[%s] kXR_bind request failed",
2164  hsData->streamName.c_str() );
2165  return XRootDStatus( stFatal, errHandShakeFailed, 0, "kXR_bind request failed" );
2166  }
2167 
2168  info->stream[hsData->subStreamId].pathId = rsp->body.bind.substreamid;
2169  log->Debug( XRootDTransportMsg, "[%s] kXR_bind successful",
2170  hsData->streamName.c_str() );
2171 
2172  return XRootDStatus();
2173  }
2174 
2175  //----------------------------------------------------------------------------
2176  // Generate the login message
2177  //----------------------------------------------------------------------------
2178  Message *XRootDTransport::GenerateLogIn( HandShakeData *hsData,
2179  XRootDChannelInfo *info )
2180  {
2181  Log *log = DefaultEnv::GetLog();
2182  Env *env = DefaultEnv::GetEnv();
2183 
2184  //--------------------------------------------------------------------------
2185  // Compute the login cgi
2186  //--------------------------------------------------------------------------
2187  int timeZone = XrdSysTimer::TimeZone();
2188  char *hostName = XrdNetUtils::MyHostName();
2189  std::string countryCode = Utils::FQDNToCC( hostName );
2190  char *cgiBuffer = new char[1024 + info->logintoken.size()];
2191  std::string appName;
2192  std::string monInfo;
2193  env->GetString( "AppName", appName );
2194  env->GetString( "MonInfo", monInfo );
2195  if( info->logintoken.empty() )
2196  {
2197  snprintf( cgiBuffer, 1024,
2198  "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2199  "xrd.hostname=%s&xrd.rn=%s", countryCode.c_str(), timeZone,
2200  appName.c_str(), monInfo.c_str(), hostName, XrdVERSION );
2201  }
2202  else
2203  {
2204  snprintf( cgiBuffer, 1024,
2205  "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2206  "xrd.hostname=%s&xrd.rn=%s&%s", countryCode.c_str(), timeZone,
2207  appName.c_str(), monInfo.c_str(), hostName, XrdVERSION, info->logintoken.c_str() );
2208  }
2209  uint16_t cgiLen = strlen( cgiBuffer );
2210  free( hostName );
2211 
2212  //--------------------------------------------------------------------------
2213  // Generate the message
2214  //--------------------------------------------------------------------------
2215  Message *msg = new Message( sizeof(ClientLoginRequest) + cgiLen );
2216  ClientLoginRequest *loginReq = (ClientLoginRequest *)msg->GetBuffer();
2217 
2218  loginReq->requestid = kXR_login;
2219  loginReq->pid = ::getpid();
2220  loginReq->capver[0] = kXR_asyncap | kXR_ver005;
2221  loginReq->dlen = cgiLen;
2223 #ifdef WITH_XRDEC
2224  loginReq->ability2 = kXR_ecredir;
2225 #endif
2226 
2227  int multiProtocol = 0;
2228  env->GetInt( "MultiProtocol", multiProtocol );
2229  if(multiProtocol)
2230  loginReq->ability |= kXR_multipr;
2231 
2232  //--------------------------------------------------------------------------
2233  // Check the IP stacks
2234  //--------------------------------------------------------------------------
2236  bool dualStack = false;
2237  bool privateIPv6 = false;
2238  bool privateIPv4 = false;
2239 
2240  if( (stacks & XrdNetUtils::hasIP64) == XrdNetUtils::hasIP64 )
2241  {
2242  dualStack = true;
2243  loginReq->ability |= kXR_hasipv64;
2244  }
2245 
2246  if( (stacks & XrdNetUtils::hasIPv6) && !(stacks & XrdNetUtils::hasPub6) )
2247  {
2248  privateIPv6 = true;
2249  loginReq->ability |= kXR_onlyprv6;
2250  }
2251 
2252  if( (stacks & XrdNetUtils::hasIPv4) && !(stacks & XrdNetUtils::hasPub4) )
2253  {
2254  privateIPv4 = true;
2255  loginReq->ability |= kXR_onlyprv4;
2256  }
2257 
2258  // The following code snippet tries to overcome the problem that this host
2259  // may still be dual-stacked but we don't know it because one of the
2260  // interfaces was not registered in DNS.
2261  //
2262  if( !dualStack && hsData->serverAddr )
2263  {if ( ( ( stacks & XrdNetUtils::hasIPv4 )
2264  && hsData->serverAddr->isIPType(XrdNetAddrInfo::IPv6))
2265  || ( ( stacks & XrdNetUtils::hasIPv6 )
2266  && hsData->serverAddr->isIPType(XrdNetAddrInfo::IPv4)))
2267  {dualStack = true;
2268  loginReq->ability |= kXR_hasipv64;
2269  }
2270  }
2271 
2272  //--------------------------------------------------------------------------
2273  // Check the username
2274  //--------------------------------------------------------------------------
2275  std::string buffer( 8, 0 );
2276  if( hsData->url->GetUserName().length() )
2277  buffer = hsData->url->GetUserName();
2278  else
2279  {
2280  char *name = new char[1024];
2281  if( !XrdOucUtils::UserName( geteuid(), name, 1024 ) )
2282  buffer = name;
2283  else
2284  buffer = "_anon_";
2285  delete [] name;
2286  }
2287  buffer.resize( 8, 0 );
2288  std::copy( buffer.begin(), buffer.end(), (char*)loginReq->username );
2289 
2290  msg->Append( cgiBuffer, cgiLen, 24 );
2291 
2292  log->Debug( XRootDTransportMsg, "[%s] Sending out kXR_login request, "
2293  "username: %s, cgi: %s, dual-stack: %s, private IPv4: %s, "
2294  "private IPv6: %s", hsData->streamName.c_str(),
2295  loginReq->username, cgiBuffer, dualStack ? "true" : "false",
2296  privateIPv4 ? "true" : "false",
2297  privateIPv6 ? "true" : "false" );
2298 
2299  delete [] cgiBuffer;
2300  MarshallRequest( msg );
2301  return msg;
2302  }
2303 
2304  //----------------------------------------------------------------------------
2305  // Process the protocol response
2306  //----------------------------------------------------------------------------
2307  XRootDStatus XRootDTransport::ProcessLogInResp( HandShakeData *hsData,
2308  XRootDChannelInfo *info )
2309  {
2310  Log *log = DefaultEnv::GetLog();
2311 
2312  XRootDStatus st = UnMarshallBody( hsData->in, kXR_login );
2313  if( !st.IsOK() )
2314  return st;
2315 
2316  ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2317 
2318  if( rsp->hdr.status != kXR_ok )
2319  {
2320  log->Error( XRootDTransportMsg, "[%s] Got invalid login response",
2321  hsData->streamName.c_str() );
2322  return XRootDStatus( stFatal, errLoginFailed, 0, "Got invalid login response." );
2323  }
2324 
2325  if( !info->firstLogIn )
2326  memcpy( info->oldSessionId, info->sessionId, 16 );
2327 
2328  if( rsp->hdr.dlen == 0 && info->protocolVersion <= 0x289 )
2329  {
2330  //--------------------------------------------------------------------------
2331  // This if statement is there only to support dCache inaccurate
2332  // implementation of XRoot protocol, that in some cases returns
2333  // an empty login response for protocol version <= 2.8.9.
2334  //--------------------------------------------------------------------------
2335  memset( info->sessionId, 0, 16 );
2336  log->Warning( XRootDTransportMsg,
2337  "[%s] Logged in, accepting empty login response.",
2338  hsData->streamName.c_str() );
2339  return XRootDStatus();
2340  }
2341 
2342  if( rsp->hdr.dlen < 16 )
2343  return XRootDStatus( stError, errDataError, 0, "Login response too short." );
2344 
2345  memcpy( info->sessionId, rsp->body.login.sessid, 16 );
2346 
2347  std::string sessId = Utils::Char2Hex( rsp->body.login.sessid, 16 );
2348 
2349  log->Debug( XRootDTransportMsg, "[%s] Logged in, session: %s",
2350  hsData->streamName.c_str(), sessId.c_str() );
2351 
2352  //--------------------------------------------------------------------------
2353  // We have an authentication info to process
2354  //--------------------------------------------------------------------------
2355  if( rsp->hdr.dlen > 16 )
2356  {
2357  size_t len = rsp->hdr.dlen-16;
2358  info->authBuffer = new char[len+1];
2359  info->authBuffer[len] = 0;
2360  memcpy( info->authBuffer, rsp->body.login.sec, len );
2361  log->Debug( XRootDTransportMsg, "[%s] Authentication is required: %s",
2362  hsData->streamName.c_str(), info->authBuffer );
2363 
2364  return XRootDStatus( stOK, suContinue );
2365  }
2366 
2367  return XRootDStatus();
2368  }
2369 
2370  //----------------------------------------------------------------------------
2371  // Do the authentication
2372  //----------------------------------------------------------------------------
2373  XRootDStatus XRootDTransport::DoAuthentication( HandShakeData *hsData,
2374  XRootDChannelInfo *info )
2375  {
2376  //--------------------------------------------------------------------------
2377  // Prepare
2378  //--------------------------------------------------------------------------
2379  Log *log = DefaultEnv::GetLog();
2380  XRootDStreamInfo &sInfo = info->stream[hsData->subStreamId];
2381  XrdSecCredentials *credentials = 0;
2382  std::string protocolName;
2383 
2384  //--------------------------------------------------------------------------
2385  // We're doing this for the first time
2386  //--------------------------------------------------------------------------
2387  if( sInfo.status == XRootDStreamInfo::LoginSent )
2388  {
2389  log->Debug( XRootDTransportMsg, "[%s] Sending authentication data",
2390  hsData->streamName.c_str() );
2391 
2392  //------------------------------------------------------------------------
2393  // Set up the authentication environment
2394  //------------------------------------------------------------------------
2395  info->authEnv = new XrdOucEnv();
2396  info->authEnv->Put( "sockname", hsData->clientName.c_str() );
2397  info->authEnv->Put( "username", hsData->url->GetUserName().c_str() );
2398  info->authEnv->Put( "password", hsData->url->GetPassword().c_str() );
2399 
2400  const URL::ParamsMap &urlParams = hsData->url->GetParams();
2401  URL::ParamsMap::const_iterator it;
2402  for( it = urlParams.begin(); it != urlParams.end(); ++it )
2403  {
2404  if( it->first.compare( 0, 4, "xrd." ) == 0 ||
2405  it->first.compare( 0, 6, "xrdcl." ) == 0 )
2406  info->authEnv->Put( it->first.c_str(), it->second.c_str() );
2407  }
2408 
2409  //------------------------------------------------------------------------
2410  // Initialize some other structs
2411  //------------------------------------------------------------------------
2412  size_t authBuffLen = strlen( info->authBuffer );
2413  char *pars = (char *)malloc( authBuffLen + 1 );
2414  memcpy( pars, info->authBuffer, authBuffLen );
2415  info->authParams = new XrdSecParameters( pars, authBuffLen );
2416  sInfo.status = XRootDStreamInfo::AuthSent;
2417  delete [] info->authBuffer;
2418  info->authBuffer = 0;
2419 
2420  //------------------------------------------------------------------------
2421  // Find a protocol that gives us valid credentials
2422  //------------------------------------------------------------------------
2423  XRootDStatus st = GetCredentials( credentials, hsData, info );
2424  if( !st.IsOK() )
2425  {
2426  CleanUpAuthentication( info );
2427  return st;
2428  }
2429  protocolName = info->authProtocol->Entity.prot;
2430  }
2431 
2432  //--------------------------------------------------------------------------
2433  // We've been here already
2434  //--------------------------------------------------------------------------
2435  else
2436  {
2437  ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2438  protocolName = info->authProtocol->Entity.prot;
2439 
2440  //------------------------------------------------------------------------
2441  // We're required to send out more authentication data
2442  //------------------------------------------------------------------------
2443  if( rsp->hdr.status == kXR_authmore )
2444  {
2445  log->Debug( XRootDTransportMsg,
2446  "[%s] Sending more authentication data for %s",
2447  hsData->streamName.c_str(), protocolName.c_str() );
2448 
2449  uint32_t len = rsp->hdr.dlen;
2450  char *secTokenData = (char*)malloc( len );
2451  memcpy( secTokenData, rsp->body.authmore.data, len );
2452  XrdSecParameters *secToken = new XrdSecParameters( secTokenData, len );
2453  XrdOucErrInfo ei( "", info->authEnv);
2454  credentials = info->authProtocol->getCredentials( secToken, &ei );
2455  delete secToken;
2456 
2457  //----------------------------------------------------------------------
2458  // The protocol handler refuses to give us the data
2459  //----------------------------------------------------------------------
2460  if( !credentials )
2461  {
2462  log->Error( XRootDTransportMsg,
2463  "[%s] Auth protocol handler for %s refuses to give "
2464  "us more credentials %s",
2465  hsData->streamName.c_str(), protocolName.c_str(),
2466  ei.getErrText() );
2467  CleanUpAuthentication( info );
2468  return XRootDStatus( stFatal, errAuthFailed, 0, ei.getErrText() );
2469  }
2470  }
2471 
2472  //------------------------------------------------------------------------
2473  // We have succeeded
2474  //------------------------------------------------------------------------
2475  else if( rsp->hdr.status == kXR_ok )
2476  {
2477  info->authProtocolName = info->authProtocol->Entity.prot;
2478 
2479  //----------------------------------------------------------------------
2480  // Do we need protection?
2481  //----------------------------------------------------------------------
2482  if( info->protRespBody )
2483  {
2484  int rc = XrdSecGetProtection( info->protection, *info->authProtocol, *info->protRespBody, info->protRespSize );
2485  if( rc > 0 )
2486  {
2487  log->Debug( XRootDTransportMsg,
2488  "[%s] XrdSecProtect loaded.", hsData->streamName.c_str() );
2489  }
2490  else if( rc == 0 )
2491  {
2492  log->Debug( XRootDTransportMsg,
2493  "[%s] XrdSecProtect: no protection needed.",
2494  hsData->streamName.c_str() );
2495  }
2496  else
2497  {
2498  log->Debug( XRootDTransportMsg,
2499  "[%s] Failed to load XrdSecProtect: %s",
2500  hsData->streamName.c_str(), XrdSysE2T( -rc ) );
2501  CleanUpAuthentication( info );
2502 
2503  return XRootDStatus( stError, errAuthFailed, -rc, XrdSysE2T( -rc ) );
2504  }
2505  }
2506 
2507  if( !info->protection )
2508  CleanUpAuthentication( info );
2509  else
2510  pSecUnloadHandler->Register( info->authProtocolName );
2511 
2512  log->Debug( XRootDTransportMsg,
2513  "[%s] Authenticated with %s.", hsData->streamName.c_str(),
2514  protocolName.c_str() );
2515 
2516  //--------------------------------------------------------------------
2517  // Clear the SSL error queue of the calling thread, as there might be
2518  // some leftover from the authentication!
2519  //--------------------------------------------------------------------
2521 
2522  return XRootDStatus();
2523  }
2524  //------------------------------------------------------------------------
2525  // Failure
2526  //------------------------------------------------------------------------
2527  else if( rsp->hdr.status == kXR_error )
2528  {
2529  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
2530  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
2531  log->Error( XRootDTransportMsg,
2532  "[%s] Authentication with %s failed: %s",
2533  hsData->streamName.c_str(), protocolName.c_str(),
2534  errmsg );
2535  delete [] errmsg;
2536 
2537  info->authProtocol->Delete();
2538  info->authProtocol = 0;
2539 
2540  //----------------------------------------------------------------------
2541  // Find another protocol that gives us valid credentials
2542  //----------------------------------------------------------------------
2543  XRootDStatus st = GetCredentials( credentials, hsData, info );
2544  if( !st.IsOK() )
2545  {
2546  CleanUpAuthentication( info );
2547  return st;
2548  }
2549  protocolName = info->authProtocol->Entity.prot;
2550  }
2551  //------------------------------------------------------------------------
2552  // God knows what
2553  //------------------------------------------------------------------------
2554  else
2555  {
2556  info->authProtocolName = info->authProtocol->Entity.prot;
2557  CleanUpAuthentication( info );
2558 
2559  log->Error( XRootDTransportMsg,
2560  "[%s] Authentication with %s failed: unexpected answer",
2561  hsData->streamName.c_str(), protocolName.c_str() );
2562  return XRootDStatus( stFatal, errAuthFailed, 0, "Authentication failed: unexpected answer." );
2563  }
2564  }
2565 
2566  //--------------------------------------------------------------------------
2567  // Generate the client request
2568  //--------------------------------------------------------------------------
2569  Message *msg = new Message( sizeof(ClientAuthRequest)+credentials->size );
2570  msg->Zero();
2571  ClientRequest *req = (ClientRequest*)msg->GetBuffer();
2572  char *reqBuffer = msg->GetBuffer(sizeof(ClientAuthRequest));
2573 
2574  req->header.requestid = kXR_auth;
2575  req->auth.dlen = credentials->size;
2576  memcpy( req->auth.credtype, protocolName.c_str(),
2577  protocolName.length() > 4 ? 4 : protocolName.length() );
2578 
2579  memcpy( reqBuffer, credentials->buffer, credentials->size );
2580  hsData->out = msg;
2581  MarshallRequest( msg );
2582  delete credentials;
2583 
2584  //------------------------------------------------------------------------
2585  // Clear the SSL error queue of the calling thread, as there might be
2586  // some leftover from the authentication!
2587  //------------------------------------------------------------------------
2589 
2590  return XRootDStatus( stOK, suContinue );
2591  }
2592 
2593  //------------------------------------------------------------------------
2594  // Get the initial credentials using one of the protocols
2595  //------------------------------------------------------------------------
2596  XRootDStatus XRootDTransport::GetCredentials( XrdSecCredentials *&credentials,
2597  HandShakeData *hsData,
2598  XRootDChannelInfo *info )
2599  {
2600  //--------------------------------------------------------------------------
2601  // Set up the auth handler
2602  //--------------------------------------------------------------------------
2603  Log *log = DefaultEnv::GetLog();
2604  XrdOucErrInfo ei( "", info->authEnv);
2605  XrdSecGetProt_t authHandler = GetAuthHandler();
2606  if( !authHandler )
2607  return XRootDStatus( stFatal, errAuthFailed, 0, "Could not load authentication handler." );
2608 
2609  //--------------------------------------------------------------------------
2610  // Retrieve secuid and secgid, if available. These will override the fsuid
2611  // and fsgid of the current thread reading the credentials to prevent
2612  // security holes in case this process is running with elevated permissions.
2613  //--------------------------------------------------------------------------
2614  char *secuidc = (ei.getEnv()) ? ei.getEnv()->Get("xrdcl.secuid") : 0;
2615  char *secgidc = (ei.getEnv()) ? ei.getEnv()->Get("xrdcl.secgid") : 0;
2616 
2617  int secuid = -1;
2618  int secgid = -1;
2619 
2620  if(secuidc) secuid = atoi(secuidc);
2621  if(secgidc) secgid = atoi(secgidc);
2622 
2623 #ifdef __linux__
2624  ScopedFsUidSetter uidSetter(secuid, secgid, hsData->streamName);
2625  if(!uidSetter.IsOk()) {
2626  log->Error( XRootDTransportMsg, "[%s] Error while setting (fsuid, fsgid) to (%d, %d)",
2627  hsData->streamName.c_str(), secuid, secgid );
2628  return XRootDStatus( stFatal, errAuthFailed, 0, "Error while setting (fsuid, fsgid)." );
2629  }
2630 #else
2631  if(secuid >= 0 || secgid >= 0) {
2632  log->Error( XRootDTransportMsg, "[%s] xrdcl.secuid and xrdcl.secgid only supported on Linux.",
2633  hsData->streamName.c_str() );
2634  return XRootDStatus( stFatal, errAuthFailed, 0, "xrdcl.secuid and xrdcl.secgid"
2635  " only supported on Linux" );
2636  }
2637 #endif
2638 
2639  //--------------------------------------------------------------------------
2640  // Loop over the possible protocols to find one that gives us valid
2641  // credentials
2642  //--------------------------------------------------------------------------
2643  XrdNetAddr &srvAddrInfo = *const_cast<XrdNetAddr *>(hsData->serverAddr);
2644  srvAddrInfo.SetTLS( info->encrypted );
2645  while(1)
2646  {
2647  //------------------------------------------------------------------------
2648  // Get the protocol
2649  //------------------------------------------------------------------------
2650  info->authProtocol = (*authHandler)( hsData->url->GetHostName().c_str(),
2651  srvAddrInfo,
2652  *info->authParams,
2653  &ei );
2654  if( !info->authProtocol )
2655  {
2656  log->Error( XRootDTransportMsg, "[%s] No protocols left to try",
2657  hsData->streamName.c_str() );
2658  return XRootDStatus( stFatal, errAuthFailed, 0, "No protocols left to try" );
2659  }
2660 
2661  std::string protocolName = info->authProtocol->Entity.prot;
2662  log->Debug( XRootDTransportMsg, "[%s] Trying to authenticate using %s",
2663  hsData->streamName.c_str(), protocolName.c_str() );
2664 
2665  //------------------------------------------------------------------------
2666  // Get the credentials from the current protocol
2667  //------------------------------------------------------------------------
2668  credentials = info->authProtocol->getCredentials( 0, &ei );
2669  if( !credentials )
2670  {
2671  log->Debug( XRootDTransportMsg,
2672  "[%s] Cannot get credentials for protocol %s: %s",
2673  hsData->streamName.c_str(), protocolName.c_str(),
2674  ei.getErrText() );
2675  info->authProtocol->Delete();
2676  continue;
2677  }
2678  return XRootDStatus( stOK, suContinue );
2679  }
2680  }
2681 
2682  //------------------------------------------------------------------------
2683  // Clean up the data structures created for the authentication process
2684  //------------------------------------------------------------------------
2685  Status XRootDTransport::CleanUpAuthentication( XRootDChannelInfo *info )
2686  {
2687  if( info->authProtocol )
2688  info->authProtocol->Delete();
2689  delete info->authParams;
2690  delete info->authEnv;
2691  info->authProtocol = 0;
2692  info->authParams = 0;
2693  info->authEnv = 0;
2695  return Status();
2696  }
2697 
2698  //------------------------------------------------------------------------
2699  // Clean up the data structures created for the protection purposes
2700  //------------------------------------------------------------------------
2701  Status XRootDTransport::CleanUpProtection( XRootDChannelInfo *info )
2702  {
2703  XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
2704  if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
2705 
2706  if( info->protection )
2707  {
2708  info->protection->Delete();
2709  info->protection = 0;
2710 
2711  CleanUpAuthentication( info );
2712  }
2713 
2714  if( info->protRespBody )
2715  {
2716  delete info->protRespBody;
2717  info->protRespBody = 0;
2718  info->protRespSize = 0;
2719  }
2720 
2721  return Status();
2722  }
2723 
2724  //----------------------------------------------------------------------------
2725  // Get the authentication function handle
2726  //----------------------------------------------------------------------------
2727  XrdSecGetProt_t XRootDTransport::GetAuthHandler()
2728  {
2729  Log *log = DefaultEnv::GetLog();
2730  char errorBuff[1024];
2731 
2732  // the static constructor is invoked only once and it is guaranteed that this
2733  // is thread safe
2734  static std::atomic<XrdSecGetProt_t> authHandler( XrdSecLoadSecFactory( errorBuff, 1024 ) );
2735  auto ret = authHandler.load( std::memory_order_relaxed );
2736  if( ret ) return ret;
2737 
2738  // if we are here it means we failed to load the security library for the
2739  // first time and we hope the environment changed
2740 
2741  // obtain a lock
2742  static XrdSysMutex mtx;
2743  XrdSysMutexHelper lck( mtx );
2744  // check if in the meanwhile some else didn't load the library
2745  ret = authHandler.load( std::memory_order_relaxed );
2746  if( ret ) return ret;
2747 
2748  // load the library
2749  ret = XrdSecLoadSecFactory( errorBuff, 1024 );
2750  authHandler.store( ret, std::memory_order_relaxed );
2751  // if we failed report an error
2752  if( !ret )
2753  {
2754  log->Error( XRootDTransportMsg,
2755  "Unable to get the security framework: %s", errorBuff );
2756  return 0;
2757  }
2758  return ret;
2759  }
2760 
2761  //----------------------------------------------------------------------------
2762  // Generate the end session message
2763  //----------------------------------------------------------------------------
2764  Message *XRootDTransport::GenerateEndSession( HandShakeData *hsData,
2765  XRootDChannelInfo *info )
2766  {
2767  Log *log = DefaultEnv::GetLog();
2768 
2769  //--------------------------------------------------------------------------
2770  // Generate the message
2771  //--------------------------------------------------------------------------
2772  Message *msg = new Message( sizeof(ClientEndsessRequest) );
2773  ClientEndsessRequest *endsessReq = (ClientEndsessRequest *)msg->GetBuffer();
2774 
2775  endsessReq->requestid = kXR_endsess;
2776  memcpy( endsessReq->sessid, info->oldSessionId, 16 );
2777  std::string sessId = Utils::Char2Hex( endsessReq->sessid, 16 );
2778 
2779  log->Debug( XRootDTransportMsg, "[%s] Sending out kXR_endsess for session:"
2780  " %s", hsData->streamName.c_str(), sessId.c_str() );
2781 
2782  MarshallRequest( msg );
2783 
2784  Message *sign = 0;
2785  GetSignature( msg, sign, info );
2786  if( sign )
2787  {
2788  //------------------------------------------------------------------------
2789  // Now place both the signature and the request in a single buffer
2790  //------------------------------------------------------------------------
2791  uint32_t size = sign->GetSize();
2792  sign->ReAllocate( size + msg->GetSize() );
2793  char* buffer = sign->GetBuffer( size );
2794  memcpy( buffer, msg->GetBuffer(), msg->GetSize() );
2795  msg->Grab( sign->GetBuffer(), sign->GetSize() );
2796  }
2797 
2798  return msg;
2799  }
2800 
2801  //----------------------------------------------------------------------------
2802  // Process the protocol response
2803  //----------------------------------------------------------------------------
2804  Status XRootDTransport::ProcessEndSessionResp( HandShakeData *hsData,
2805  XRootDChannelInfo *info )
2806  {
2807  Log *log = DefaultEnv::GetLog();
2808 
2809  Status st = UnMarshallBody( hsData->in, kXR_endsess );
2810  if( !st.IsOK() )
2811  return st;
2812 
2813  ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2814 
2815  // If we're good, we're good!
2816  if( rsp->hdr.status == kXR_ok )
2817  return Status();
2818 
2819  // we ignore not found errors as such an error means the connection
2820  // has been already terminated
2821  if( rsp->hdr.status == kXR_error && rsp->body.error.errnum == kXR_NotFound )
2822  return Status();
2823 
2824  // other errors
2825  if( rsp->hdr.status == kXR_error )
2826  {
2827  std::string errorMsg( rsp->body.error.errmsg, rsp->hdr.dlen - 4 );
2828  log->Error( XRootDTransportMsg, "[%s] Got error response to "
2829  "kXR_endsess: %s", hsData->streamName.c_str(),
2830  errorMsg.c_str() );
2831  return Status( stFatal, errHandShakeFailed );
2832  }
2833 
2834  // Wait Response.
2835  if( rsp->hdr.status == kXR_wait )
2836  {
2837  std::string msg( rsp->body.wait.infomsg, rsp->hdr.dlen - 4 );
2838  log->Info( XRootDTransportMsg, "[%s] Got wait response to "
2839  "kXR_endsess: %s", hsData->streamName.c_str(),
2840  msg.c_str() );
2841  hsData->out = GenerateEndSession( hsData, info );
2842  return Status( stOK, suRetry );
2843  }
2844 
2845  // Any other response is protocol violation
2846  return Status( stError, errDataError );
2847  }
2848 
2849  //----------------------------------------------------------------------------
2850  // Get a string representation of the server flags
2851  //----------------------------------------------------------------------------
2852  std::string XRootDTransport::ServerFlagsToStr( uint32_t flags )
2853  {
2854  std::string repr = "type: ";
2855  if( flags & kXR_isManager )
2856  repr += "manager ";
2857 
2858  else if( flags & kXR_isServer )
2859  repr += "server ";
2860 
2861  repr += "[";
2862 
2863  if( flags & kXR_attrMeta )
2864  repr += "meta ";
2865 
2866  else if( flags & kXR_attrCache )
2867  repr += "cache ";
2868 
2869  else if( flags & kXR_attrProxy )
2870  repr += "proxy ";
2871 
2872  else if( flags & kXR_attrSuper )
2873  repr += "super ";
2874 
2875  else
2876  repr += " ";
2877 
2878  repr.erase( repr.length()-1, 1 );
2879 
2880  repr += "]";
2881  return repr;
2882  }
2883 }
2884 
2885 namespace
2886 {
2887  // Extract file name from a request
2888  //----------------------------------------------------------------------------
2889  char *GetDataAsString( char *msg )
2890  {
2891  ClientRequestHdr *req = (ClientRequestHdr*)msg;
2892  char *fn = new char[req->dlen+1];
2893  memcpy( fn, msg + 24, req->dlen );
2894  fn[req->dlen] = 0;
2895  return fn;
2896  }
2897 }
2898 
2899 namespace XrdCl
2900 {
2901  //----------------------------------------------------------------------------
2902  // Get the description of a message
2903  //----------------------------------------------------------------------------
2904  void XRootDTransport::GenerateDescription( char *msg, std::ostringstream &o )
2905  {
2906  Log *log = DefaultEnv::GetLog();
2907  if( log->GetLevel() < Log::ErrorMsg )
2908  return;
2909 
2910  ClientRequestHdr *req = (ClientRequestHdr *)msg;
2911  switch( req->requestid )
2912  {
2913  //------------------------------------------------------------------------
2914  // kXR_open
2915  //------------------------------------------------------------------------
2916  case kXR_open:
2917  {
2918  ClientOpenRequest *sreq = (ClientOpenRequest *)msg;
2919  o << "kXR_open (";
2920  char *fn = GetDataAsString( msg );
2921  o << "file: " << fn << ", ";
2922  delete [] fn;
2923  o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
2924  o << std::setbase(10);
2925  o << "flags: ";
2926  if( sreq->options == 0 )
2927  o << "none";
2928  else
2929  {
2930  if( sreq->options & kXR_delete )
2931  o << "kXR_delete ";
2932  if( sreq->options & kXR_force )
2933  o << "kXR_force ";
2934  if( sreq->options & kXR_mkpath )
2935  o << "kXR_mkpath ";
2936  if( sreq->options & kXR_new )
2937  o << "kXR_new ";
2938  if( sreq->options & kXR_nowait )
2939  o << "kXR_delete ";
2940  if( sreq->options & kXR_open_apnd )
2941  o << "kXR_open_apnd ";
2942  if( sreq->options & kXR_open_read )
2943  o << "kXR_open_read ";
2944  if( sreq->options & kXR_open_updt )
2945  o << "kXR_open_updt ";
2946  if( sreq->options & kXR_posc )
2947  o << "kXR_posc ";
2948  if( sreq->options & kXR_refresh )
2949  o << "kXR_refresh ";
2950  if( sreq->options & kXR_replica )
2951  o << "kXR_replica ";
2952  if( sreq->options & kXR_seqio )
2953  o << "kXR_seqio ";
2954  if( sreq->options & kXR_async )
2955  o << "kXR_async ";
2956  if( sreq->options & kXR_retstat )
2957  o << "kXR_retstat ";
2958  }
2959  o << ")";
2960  break;
2961  }
2962 
2963  //------------------------------------------------------------------------
2964  // kXR_close
2965  //------------------------------------------------------------------------
2966  case kXR_close:
2967  {
2968  ClientCloseRequest *sreq = (ClientCloseRequest *)msg;
2969  o << "kXR_close (";
2970  o << "handle: " << FileHandleToStr( sreq->fhandle );
2971  o << ")";
2972  break;
2973  }
2974 
2975  //------------------------------------------------------------------------
2976  // kXR_stat
2977  //------------------------------------------------------------------------
2978  case kXR_stat:
2979  {
2980  ClientStatRequest *sreq = (ClientStatRequest *)msg;
2981  o << "kXR_stat (";
2982  if( sreq->dlen )
2983  {
2984  char *fn = GetDataAsString( msg );;
2985  o << "path: " << fn << ", ";
2986  delete [] fn;
2987  }
2988  else
2989  {
2990  o << "handle: " << FileHandleToStr( sreq->fhandle );
2991  o << ", ";
2992  }
2993  o << "flags: ";
2994  if( sreq->options == 0 )
2995  o << "none";
2996  else
2997  {
2998  if( sreq->options & kXR_vfs )
2999  o << "kXR_vfs";
3000  }
3001  o << ")";
3002  break;
3003  }
3004 
3005  //------------------------------------------------------------------------
3006  // kXR_read
3007  //------------------------------------------------------------------------
3008  case kXR_read:
3009  {
3010  ClientReadRequest *sreq = (ClientReadRequest *)msg;
3011  o << "kXR_read (";
3012  o << "handle: " << FileHandleToStr( sreq->fhandle );
3013  o << std::setbase(10);
3014  o << ", ";
3015  o << "offset: " << sreq->offset << ", ";
3016  o << "size: " << sreq->rlen << ")";
3017  break;
3018  }
3019 
3020  //------------------------------------------------------------------------
3021  // kXR_pgread
3022  //------------------------------------------------------------------------
3023  case kXR_pgread:
3024  {
3026  o << "kXR_pgread (";
3027  o << "handle: " << FileHandleToStr( sreq->fhandle );
3028  o << std::setbase(10);
3029  o << ", ";
3030  o << "offset: " << sreq->offset << ", ";
3031  o << "size: " << sreq->rlen << ")";
3032  break;
3033  }
3034 
3035  //------------------------------------------------------------------------
3036  // kXR_write
3037  //------------------------------------------------------------------------
3038  case kXR_write:
3039  {
3040  ClientWriteRequest *sreq = (ClientWriteRequest *)msg;
3041  o << "kXR_write (";
3042  o << "handle: " << FileHandleToStr( sreq->fhandle );
3043  o << std::setbase(10);
3044  o << ", ";
3045  o << "offset: " << sreq->offset << ", ";
3046  o << "size: " << sreq->dlen << ")";
3047  break;
3048  }
3049 
3050  //------------------------------------------------------------------------
3051  // kXR_pgwrite
3052  //------------------------------------------------------------------------
3053  case kXR_pgwrite:
3054  {
3056  o << "kXR_pgwrite (";
3057  o << "handle: " << FileHandleToStr( sreq->fhandle );
3058  o << std::setbase(10);
3059  o << ", ";
3060  o << "offset: " << sreq->offset << ", ";
3061  o << "size: " << sreq->dlen << ")";
3062  break;
3063  }
3064 
3065  //------------------------------------------------------------------------
3066  // kXR_sync
3067  //------------------------------------------------------------------------
3068  case kXR_sync:
3069  {
3070  ClientSyncRequest *sreq = (ClientSyncRequest *)msg;
3071  o << "kXR_sync (";
3072  o << "handle: " << FileHandleToStr( sreq->fhandle );
3073  o << ")";
3074  break;
3075  }
3076 
3077  //------------------------------------------------------------------------
3078  // kXR_truncate
3079  //------------------------------------------------------------------------
3080  case kXR_truncate:
3081  {
3083  o << "kXR_truncate (";
3084  if( !sreq->dlen )
3085  o << "handle: " << FileHandleToStr( sreq->fhandle );
3086  else
3087  {
3088  char *fn = GetDataAsString( msg );
3089  o << "file: " << fn;
3090  delete [] fn;
3091  }
3092  o << std::setbase(10);
3093  o << ", ";
3094  o << "offset: " << sreq->offset;
3095  o << ")";
3096  break;
3097  }
3098 
3099  //------------------------------------------------------------------------
3100  // kXR_readv
3101  //------------------------------------------------------------------------
3102  case kXR_readv:
3103  {
3104  unsigned char *fhandle = 0;
3105  o << "kXR_readv (";
3106 
3107  o << "handle: ";
3108  readahead_list *dataChunk = (readahead_list*)(msg + 24 );
3109  fhandle = dataChunk[0].fhandle;
3110  if( fhandle )
3111  o << FileHandleToStr( fhandle );
3112  else
3113  o << "unknown";
3114  o << ", ";
3115  o << std::setbase(10);
3116  o << "chunks: [";
3117  uint64_t size = 0;
3118  for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3119  {
3120  size += dataChunk[i].rlen;
3121  o << "(offset: " << dataChunk[i].offset;
3122  o << ", size: " << dataChunk[i].rlen << "); ";
3123  }
3124  o << "], ";
3125  o << "total size: " << size << ")";
3126  break;
3127  }
3128 
3129  //------------------------------------------------------------------------
3130  // kXR_writev
3131  //------------------------------------------------------------------------
3132  case kXR_writev:
3133  {
3134  unsigned char *fhandle = 0;
3135  o << "kXR_writev (";
3136 
3137  XrdProto::write_list *wrtList =
3138  reinterpret_cast<XrdProto::write_list*>( msg + 24 );
3139  uint64_t size = 0;
3140  uint32_t numChunks = 0;
3141  for( size_t i = 0; i < req->dlen/sizeof(XrdProto::write_list); ++i )
3142  {
3143  fhandle = wrtList[i].fhandle;
3144  size += wrtList[i].wlen;
3145  ++numChunks;
3146  }
3147  o << "handle: ";
3148  if( fhandle )
3149  o << FileHandleToStr( fhandle );
3150  else
3151  o << "unknown";
3152  o << ", ";
3153  o << std::setbase(10);
3154  o << "chunks: " << numChunks << ", ";
3155  o << "total size: " << size << ")";
3156  break;
3157  }
3158 
3159  //------------------------------------------------------------------------
3160  // kXR_locate
3161  //------------------------------------------------------------------------
3162  case kXR_locate:
3163  {
3165  char *fn = GetDataAsString( msg );;
3166  o << "kXR_locate (";
3167  o << "path: " << fn << ", ";
3168  delete [] fn;
3169  o << "flags: ";
3170  if( sreq->options == 0 )
3171  o << "none";
3172  else
3173  {
3174  if( sreq->options & kXR_refresh )
3175  o << "kXR_refresh ";
3176  if( sreq->options & kXR_prefname )
3177  o << "kXR_prefname ";
3178  if( sreq->options & kXR_nowait )
3179  o << "kXR_nowait ";
3180  if( sreq->options & kXR_force )
3181  o << "kXR_force ";
3182  if( sreq->options & kXR_compress )
3183  o << "kXR_compress ";
3184  }
3185  o << ")";
3186  break;
3187  }
3188 
3189  //------------------------------------------------------------------------
3190  // kXR_mv
3191  //------------------------------------------------------------------------
3192  case kXR_mv:
3193  {
3194  ClientMvRequest *sreq = (ClientMvRequest *)msg;
3195  o << "kXR_mv (";
3196  o << "source: ";
3197  o.write( msg + sizeof( ClientMvRequest ), sreq->arg1len );
3198  o << ", ";
3199  o << "destination: ";
3200  o.write( msg + sizeof( ClientMvRequest ) + sreq->arg1len + 1, sreq->dlen - sreq->arg1len - 1 );
3201  o << ")";
3202  break;
3203  }
3204 
3205  //------------------------------------------------------------------------
3206  // kXR_query
3207  //------------------------------------------------------------------------
3208  case kXR_query:
3209  {
3210  ClientQueryRequest *sreq = (ClientQueryRequest *)msg;
3211  o << "kXR_query (";
3212  o << "code: ";
3213  switch( sreq->infotype )
3214  {
3215  case kXR_Qconfig: o << "kXR_Qconfig"; break;
3216  case kXR_Qckscan: o << "kXR_Qckscan"; break;
3217  case kXR_Qcksum: o << "kXR_Qcksum"; break;
3218  case kXR_Qopaque: o << "kXR_Qopaque"; break;
3219  case kXR_Qopaquf: o << "kXR_Qopaquf"; break;
3220  case kXR_Qopaqug: o << "kXR_Qopaqug"; break;
3221  case kXR_QPrep: o << "kXR_QPrep"; break;
3222  case kXR_Qspace: o << "kXR_Qspace"; break;
3223  case kXR_QStats: o << "kXR_QStats"; break;
3224  case kXR_Qvisa: o << "kXR_Qvisa"; break;
3225  case kXR_Qxattr: o << "kXR_Qxattr"; break;
3226  default: o << sreq->infotype; break;
3227  }
3228  o << ", ";
3229 
3230  if( sreq->infotype == kXR_Qopaqug || sreq->infotype == kXR_Qvisa )
3231  {
3232  o << "handle: " << FileHandleToStr( sreq->fhandle );
3233  o << ", ";
3234  }
3235 
3236  o << "arg length: " << sreq->dlen << ")";
3237  break;
3238  }
3239 
3240  //------------------------------------------------------------------------
3241  // kXR_rm
3242  //------------------------------------------------------------------------
3243  case kXR_rm:
3244  {
3245  o << "kXR_rm (";
3246  char *fn = GetDataAsString( msg );;
3247  o << "path: " << fn << ")";
3248  delete [] fn;
3249  break;
3250  }
3251 
3252  //------------------------------------------------------------------------
3253  // kXR_mkdir
3254  //------------------------------------------------------------------------
3255  case kXR_mkdir:
3256  {
3257  ClientMkdirRequest *sreq = (ClientMkdirRequest *)msg;
3258  o << "kXR_mkdir (";
3259  char *fn = GetDataAsString( msg );
3260  o << "path: " << fn << ", ";
3261  delete [] fn;
3262  o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3263  o << std::setbase(10);
3264  o << "flags: ";
3265  if( sreq->options[0] == 0 )
3266  o << "none";
3267  else
3268  {
3269  if( sreq->options[0] & kXR_mkdirpath )
3270  o << "kXR_mkdirpath";
3271  }
3272  o << ")";
3273  break;
3274  }
3275 
3276  //------------------------------------------------------------------------
3277  // kXR_rmdir
3278  //------------------------------------------------------------------------
3279  case kXR_rmdir:
3280  {
3281  o << "kXR_rmdir (";
3282  char *fn = GetDataAsString( msg );
3283  o << "path: " << fn << ")";
3284  delete [] fn;
3285  break;
3286  }
3287 
3288  //------------------------------------------------------------------------
3289  // kXR_chmod
3290  //------------------------------------------------------------------------
3291  case kXR_chmod:
3292  {
3293  ClientChmodRequest *sreq = (ClientChmodRequest *)msg;
3294  o << "kXR_chmod (";
3295  char *fn = GetDataAsString( msg );
3296  o << "path: " << fn << ", ";
3297  delete [] fn;
3298  o << "mode: 0" << std::setbase(8) << sreq->mode << ")";
3299  break;
3300  }
3301 
3302  //------------------------------------------------------------------------
3303  // kXR_ping
3304  //------------------------------------------------------------------------
3305  case kXR_ping:
3306  {
3307  o << "kXR_ping ()";
3308  break;
3309  }
3310 
3311  //------------------------------------------------------------------------
3312  // kXR_protocol
3313  //------------------------------------------------------------------------
3314  case kXR_protocol:
3315  {
3317  o << "kXR_protocol (";
3318  o << "clientpv: 0x" << std::setbase(16) << sreq->clientpv << ")";
3319  break;
3320  }
3321 
3322  //------------------------------------------------------------------------
3323  // kXR_dirlist
3324  //------------------------------------------------------------------------
3325  case kXR_dirlist:
3326  {
3327  o << "kXR_dirlist (";
3328  char *fn = GetDataAsString( msg );;
3329  o << "path: " << fn << ")";
3330  delete [] fn;
3331  break;
3332  }
3333 
3334  //------------------------------------------------------------------------
3335  // kXR_set
3336  //------------------------------------------------------------------------
3337  case kXR_set:
3338  {
3339  o << "kXR_set (";
3340  char *fn = GetDataAsString( msg );;
3341  o << "data: " << fn << ")";
3342  delete [] fn;
3343  break;
3344  }
3345 
3346  //------------------------------------------------------------------------
3347  // kXR_prepare
3348  //------------------------------------------------------------------------
3349  case kXR_prepare:
3350  {
3352  o << "kXR_prepare (";
3353  o << "flags: ";
3354 
3355  if( sreq->options == 0 )
3356  o << "none";
3357  else
3358  {
3359  if( sreq->options & kXR_stage )
3360  o << "kXR_stage ";
3361  if( sreq->options & kXR_wmode )
3362  o << "kXR_wmode ";
3363  if( sreq->options & kXR_coloc )
3364  o << "kXR_coloc ";
3365  if( sreq->options & kXR_fresh )
3366  o << "kXR_fresh ";
3367  }
3368 
3369  o << ", priority: " << (int) sreq->prty << ", ";
3370 
3371  char *fn = GetDataAsString( msg );
3372  char *cursor;
3373  for( cursor = fn; *cursor; ++cursor )
3374  if( *cursor == '\n' ) *cursor = ' ';
3375 
3376  o << "paths: " << fn << ")";
3377  delete [] fn;
3378  break;
3379  }
3380 
3381  case kXR_chkpoint:
3382  {
3384  o << "kXR_chkpoint (";
3385  o << "opcode: ";
3386  if( sreq->opcode == kXR_ckpBegin ) o << "kXR_ckpBegin)";
3387  else if( sreq->opcode == kXR_ckpCommit ) o << "kXR_ckpCommit)";
3388  else if( sreq->opcode == kXR_ckpQuery ) o << "kXR_ckpQuery)";
3389  else if( sreq->opcode == kXR_ckpRollback ) o << "kXR_ckpRollback)";
3390  else if( sreq->opcode == kXR_ckpXeq )
3391  {
3392  o << "kXR_ckpXeq) ";
3393  // In this case our request body will be one of kXR_pgwrite,
3394  // kXR_truncate, kXR_write, or kXR_writev request.
3395  GenerateDescription( msg + sizeof( ClientChkPointRequest ), o );
3396  }
3397 
3398  break;
3399  }
3400 
3401  //------------------------------------------------------------------------
3402  // Default
3403  //------------------------------------------------------------------------
3404  default:
3405  {
3406  o << "kXR_unknown (length: " << req->dlen << ")";
3407  break;
3408  }
3409  };
3410  }
3411 
3412  //----------------------------------------------------------------------------
3413  // Get a string representation of file handle
3414  //----------------------------------------------------------------------------
3415  std::string XRootDTransport::FileHandleToStr( const unsigned char handle[4] )
3416  {
3417  std::ostringstream o;
3418  o << "0x";
3419  for( uint8_t i = 0; i < 4; ++i )
3420  {
3421  o << std::setbase(16) << std::setfill('0') << std::setw(2);
3422  o << (int)handle[i];
3423  }
3424  return o.str();
3425  }
3426 }
kXR_int32 dlen
Definition: XProtocol.hh:171
static const int kXR_ckpRollback
Definition: XProtocol.hh:215
@ kXR_NotFound
Definition: XProtocol.hh:1001
kXR_int16 arg1len
Definition: XProtocol.hh:430
#define kXR_isManager
Definition: XProtocol.hh:1156
struct ClientTruncateRequest truncate
Definition: XProtocol.hh:875
union ServerResponse::@0 body
@ kXR_ecredir
Definition: XProtocol.hh:371
#define kXR_tlsLogin
Definition: XProtocol.hh:1184
#define kXR_suppgrw
Definition: XProtocol.hh:1174
kXR_int32 dlen
Definition: XProtocol.hh:182
kXR_char fhandle[4]
Definition: XProtocol.hh:531
kXR_unt16 requestid
Definition: XProtocol.hh:394
ServerResponseStatus status
Definition: XProtocol.hh:1310
kXR_char fhandle[4]
Definition: XProtocol.hh:782
#define kXR_gotoTLS
Definition: XProtocol.hh:1180
#define kXR_attrMeta
Definition: XProtocol.hh:1159
struct ClientPgReadRequest pgread
Definition: XProtocol.hh:861
kXR_char fhandle[4]
Definition: XProtocol.hh:807
#define kXR_haveTLS
Definition: XProtocol.hh:1179
kXR_char streamid[2]
Definition: XProtocol.hh:156
kXR_char fhandle[4]
Definition: XProtocol.hh:771
struct ClientMkdirRequest mkdir
Definition: XProtocol.hh:858
kXR_int32 dlen
Definition: XProtocol.hh:431
struct ClientAuthRequest auth
Definition: XProtocol.hh:847
kXR_int64 offset
Definition: XProtocol.hh:646
kXR_char streamid[2]
Definition: XProtocol.hh:914
kXR_unt16 options
Definition: XProtocol.hh:481
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:862
#define kXR_attrSuper
Definition: XProtocol.hh:1161
struct ClientReadVRequest readv
Definition: XProtocol.hh:868
kXR_char pathid
Definition: XProtocol.hh:653
kXR_char credtype[4]
Definition: XProtocol.hh:170
kXR_char username[8]
Definition: XProtocol.hh:396
@ kXR_compress
Definition: XProtocol.hh:452
@ kXR_async
Definition: XProtocol.hh:458
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_prefname
Definition: XProtocol.hh:461
@ kXR_nowait
Definition: XProtocol.hh:467
@ kXR_open_read
Definition: XProtocol.hh:456
@ kXR_open_updt
Definition: XProtocol.hh:457
@ kXR_mkpath
Definition: XProtocol.hh:460
@ kXR_seqio
Definition: XProtocol.hh:468
@ kXR_replica
Definition: XProtocol.hh:465
@ kXR_posc
Definition: XProtocol.hh:466
@ kXR_refresh
Definition: XProtocol.hh:459
@ kXR_new
Definition: XProtocol.hh:455
@ kXR_force
Definition: XProtocol.hh:454
@ kXR_open_apnd
Definition: XProtocol.hh:462
@ kXR_retstat
Definition: XProtocol.hh:463
struct ClientOpenRequest open
Definition: XProtocol.hh:860
@ kXR_waitresp
Definition: XProtocol.hh:906
@ kXR_redirect
Definition: XProtocol.hh:904
@ kXR_status
Definition: XProtocol.hh:907
@ kXR_ok
Definition: XProtocol.hh:899
@ kXR_authmore
Definition: XProtocol.hh:902
@ kXR_attn
Definition: XProtocol.hh:901
@ kXR_wait
Definition: XProtocol.hh:905
@ kXR_error
Definition: XProtocol.hh:903
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1262
struct ClientRequestHdr header
Definition: XProtocol.hh:846
kXR_char fhandle[4]
Definition: XProtocol.hh:509
kXR_unt16 infotype
Definition: XProtocol.hh:631
kXR_int32 fourth
Definition: XProtocol.hh:87
kXR_char fhandle[4]
Definition: XProtocol.hh:645
kXR_char fhandle[4]
Definition: XProtocol.hh:659
struct ClientWriteVRequest writev
Definition: XProtocol.hh:877
kXR_char fhandle[4]
Definition: XProtocol.hh:229
struct ClientLoginRequest login
Definition: XProtocol.hh:857
kXR_unt16 requestid
Definition: XProtocol.hh:157
kXR_char fhandle[4]
Definition: XProtocol.hh:633
kXR_char sessid[16]
Definition: XProtocol.hh:181
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_mkdir
Definition: XProtocol.hh:120
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_chmod
Definition: XProtocol.hh:114
@ kXR_bind
Definition: XProtocol.hh:136
@ kXR_dirlist
Definition: XProtocol.hh:116
@ kXR_rm
Definition: XProtocol.hh:126
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_login
Definition: XProtocol.hh:119
@ kXR_auth
Definition: XProtocol.hh:112
@ kXR_endsess
Definition: XProtocol.hh:135
@ kXR_set
Definition: XProtocol.hh:130
@ kXR_rmdir
Definition: XProtocol.hh:127
@ kXR_1stRequest
Definition: XProtocol.hh:111
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_protocol
Definition: XProtocol.hh:118
@ kXR_mv
Definition: XProtocol.hh:121
@ kXR_ping
Definition: XProtocol.hh:123
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_locate
Definition: XProtocol.hh:139
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_pgwrite
Definition: XProtocol.hh:138
@ kXR_prepare
Definition: XProtocol.hh:133
struct ClientChmodRequest chmod
Definition: XProtocol.hh:850
#define kXR_isServer
Definition: XProtocol.hh:1157
#define kXR_attrCache
Definition: XProtocol.hh:1158
kXR_int32 protover
Definition: XProtocol.hh:95
struct ClientQueryRequest query
Definition: XProtocol.hh:866
kXR_int32 dlen
Definition: XProtocol.hh:648
struct ClientReadRequest read
Definition: XProtocol.hh:867
struct ClientMvRequest mv
Definition: XProtocol.hh:859
kXR_int32 rlen
Definition: XProtocol.hh:660
kXR_unt16 requestid
Definition: XProtocol.hh:180
kXR_char sessid[16]
Definition: XProtocol.hh:259
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:849
kXR_char fhandle[4]
Definition: XProtocol.hh:794
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1261
kXR_unt16 mode
Definition: XProtocol.hh:480
@ kXR_asyncap
Definition: XProtocol.hh:378
#define kXR_attrProxy
Definition: XProtocol.hh:1160
kXR_char options[1]
Definition: XProtocol.hh:416
#define kXR_PROTOCOLVERSION
Definition: XProtocol.hh:70
static const int kXR_ckpCommit
Definition: XProtocol.hh:213
kXR_int64 offset
Definition: XProtocol.hh:661
@ kXR_vfs
Definition: XProtocol.hh:763
struct ClientPrepareRequest prepare
Definition: XProtocol.hh:864
@ kXR_mkdirpath
Definition: XProtocol.hh:410
@ kXR_wmode
Definition: XProtocol.hh:591
@ kXR_fresh
Definition: XProtocol.hh:593
@ kXR_coloc
Definition: XProtocol.hh:592
@ kXR_stage
Definition: XProtocol.hh:590
static const int kXR_ckpQuery
Definition: XProtocol.hh:214
#define kXR_tlsSess
Definition: XProtocol.hh:1185
#define kXR_DataServer
Definition: XProtocol.hh:1150
kXR_int64 offset
Definition: XProtocol.hh:808
struct ClientWriteRequest write
Definition: XProtocol.hh:876
#define kXR_PROTTLSVERSION
Definition: XProtocol.hh:72
kXR_int32 dlen
Definition: XProtocol.hh:772
kXR_char options
Definition: XProtocol.hh:769
kXR_char capver[1]
Definition: XProtocol.hh:399
kXR_int32 rlen
Definition: XProtocol.hh:647
struct ClientProtocolRequest protocol
Definition: XProtocol.hh:865
@ kXR_QPrep
Definition: XProtocol.hh:616
@ kXR_Qopaqug
Definition: XProtocol.hh:625
@ kXR_Qconfig
Definition: XProtocol.hh:621
@ kXR_Qopaquf
Definition: XProtocol.hh:624
@ kXR_Qckscan
Definition: XProtocol.hh:620
@ kXR_Qxattr
Definition: XProtocol.hh:618
@ kXR_Qspace
Definition: XProtocol.hh:619
@ kXR_Qvisa
Definition: XProtocol.hh:622
@ kXR_QStats
Definition: XProtocol.hh:615
@ kXR_Qcksum
Definition: XProtocol.hh:617
@ kXR_Qopaque
Definition: XProtocol.hh:623
struct ClientLocateRequest locate
Definition: XProtocol.hh:856
@ kXR_ver005
Definition: XProtocol.hh:389
kXR_int32 msgval
Definition: XProtocol.hh:96
#define kXR_tlsData
Definition: XProtocol.hh:1182
@ kXR_readrdok
Definition: XProtocol.hh:360
@ kXR_fullurl
Definition: XProtocol.hh:358
@ kXR_onlyprv4
Definition: XProtocol.hh:362
@ kXR_lclfile
Definition: XProtocol.hh:364
@ kXR_multipr
Definition: XProtocol.hh:359
@ kXR_redirflags
Definition: XProtocol.hh:365
@ kXR_hasipv64
Definition: XProtocol.hh:361
@ kXR_onlyprv6
Definition: XProtocol.hh:363
kXR_int32 dlen
Definition: XProtocol.hh:159
ServerResponseHeader hdr
Definition: XProtocol.hh:1288
static const int kXR_ckpBegin
Definition: XProtocol.hh:212
long long kXR_int64
Definition: XPtypes.hh:98
unsigned char kXR_char
Definition: XPtypes.hh:65
XrdVERSIONINFOREF(XrdCl)
XrdSecBuffer XrdSecParameters
XrdSecProtocol *(* XrdSecGetProt_t)(const char *hostname, XrdNetAddrInfo &endPoint, XrdSecParameters &sectoken, XrdOucErrInfo *einfo)
Typedef to simplify the encoding of methods returning XrdSecProtocol.
XrdSecGetProt_t XrdSecLoadSecFactory(char *eBuff, int eBlen, const char *seclib)
int XrdSecGetProtection(XrdSecProtect *&protP, XrdSecProtocol &aprot, ServerResponseBody_Protocol &resp, unsigned int resplen)
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition: XrdClBuffer.hh:156
void Grab(char *buffer, uint32_t size)
Grab a buffer allocated outside.
Definition: XrdClBuffer.hh:228
void Zero()
Zero.
Definition: XrdClBuffer.hh:124
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void ReAllocate(uint32_t size)
Reallocate the buffer to a new location of a given size.
Definition: XrdClBuffer.hh:88
void Allocate(uint32_t size)
Allocate the buffer.
Definition: XrdClBuffer.hh:110
uint32_t GetCursor() const
Get append cursor.
Definition: XrdClBuffer.hh:140
uint32_t GetSize() const
Get the size of the message.
Definition: XrdClBuffer.hh:132
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition: XrdClBuffer.hh:189
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
Definition: XrdClEnv.cc:110
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
Handle diagnostics.
Definition: XrdClLog.hh:101
@ ErrorMsg
report errors
Definition: XrdClLog.hh:109
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition: XrdClLog.hh:258
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
void SetIsMarshalled(bool isMarshalled)
Set the marshalling status.
Definition: XrdClMessage.hh:81
bool IsMarshalled() const
Check if the message is marshalled.
Definition: XrdClMessage.hh:73
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
A network socket.
Definition: XrdClSocket.hh:43
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
Definition: XrdClSocket.cc:740
static void ClearErrorQueue()
Clear the error queue for the calling thread.
Definition: XrdClTls.cc:422
Perform the handshake and the authentication for each physical stream.
@ RequestClose
Send a close request.
virtual void WaitBeforeExit()=0
Wait before exit.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
URL representation.
Definition: XrdClURL.hh:31
std::string GetChannelId() const
Definition: XrdClURL.cc:512
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
bool IsSecure() const
Does the protocol indicate encryption.
Definition: XrdClURL.cc:482
bool IsTPC() const
Is the URL used in TPC context.
Definition: XrdClURL.cc:490
std::string GetLoginToken() const
Get the login token if present in the opaque info.
Definition: XrdClURL.cc:367
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
Definition: XrdClUtils.cc:256
static std::string FQDNToCC(const std::string &fqdn)
Convert the fully qualified host name to country code.
Definition: XrdClUtils.cc:490
static std::string Char2Hex(uint8_t *array, uint16_t size)
Print a char array as hex.
Definition: XrdClUtils.cc:635
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
const std::string & GetErrorMessage() const
Get error message.
static uint16_t NbConnectedStrm(AnyObject &channelData)
Number of currently connected data streams.
virtual bool IsStreamTTLElapsed(time_t time, AnyObject &channelData)
Check if the stream should be disconnected.
virtual void Disconnect(AnyObject &channelData, uint16_t subStreamId)
The stream has been disconnected, do the cleanups.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)
Check if the message invokes a stream action.
virtual void WaitBeforeExit()
Wait until the program can safely exit.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
virtual XRootDStatus GetBody(Message &message, Socket *socket)
virtual XRootDStatus GetHeader(Message &message, Socket *socket)
virtual uint16_t SubStreamNumber(AnyObject &channelData)
Return a number of substreams per stream that should be created.
virtual void FinalizeChannel(AnyObject &channelData)
Finalize channel.
virtual bool HandShakeDone(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
Notify the transport about a message having been sent.
virtual XRootDStatus HandShake(HandShakeData *handShakeData, AnyObject &channelData)
HandShake.
virtual XRootDStatus GetMore(Message &message, Socket *socket)
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
static void LogErrorResponse(const Message &msg)
Log server error response.
virtual void DecFileInstCnt(AnyObject &channelData)
Decrement file object instance count bound to this channel.
virtual PathID Multiplex(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual void InitializeChannel(const URL &url, AnyObject &channelData)
Initialize channel.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)
Query the channel.
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)
Get bind preference for the next data stream.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual bool NeedEncryption(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)
void SetTLS(bool val)
Definition: XrdNetAddr.cc:590
static char * MyHostName(const char *eName="*unknown*", const char **eText=0)
Definition: XrdNetUtils.cc:667
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
Definition: XrdNetUtils.cc:681
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
static int UserName(uid_t uID, char *uName, int uNsz)
virtual int Secure(SecurityRequest *&newreq, ClientRequest &thereq, const char *thedata)
static int TimeZone()
Definition: XrdSysTimer.cc:210
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
const int DefaultLoadBalancerTTL
const uint64_t XRootDTransportMsg
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errLoginFailed
Definition: XrdClStatus.hh:87
const int DefaultWantTlsOnNoPgrw
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
const uint64_t XRootDMsg
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const int DefaultSubStreamsPerChannel
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const int DefaultDataServerTTL
const uint16_t errHandShakeFailed
Definition: XrdClStatus.hh:86
const int DefaultStreamTimeout
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const uint16_t suDone
Definition: XrdClStatus.hh:38
const uint16_t suContinue
Definition: XrdClStatus.hh:39
bool InitTLS()
Definition: XrdClTls.cc:96
const int DefaultTlsNoData
const int DefaultNoTlsOK
const uint16_t errAuthFailed
Definition: XrdClStatus.hh:88
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85
XrdSysError Log
Definition: XrdConfig.cc:113
kXR_char fhandle[4]
Definition: XProtocol.hh:832
struct ServerResponseBifs_Protocol bifReqs
Definition: XProtocol.hh:1120
BindPrefSelector(std::vector< std::string > &&bindprefs)
const std::string & Get()
Data structure that carries the handshake information.
std::string streamName
Name of the stream.
uint16_t subStreamId
Sub-stream id.
Message * out
Message to be sent out.
static void UnloadHandler(const std::string &trProt)
void Register(const std::string &protocol)
std::set< std::string > protocols
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
Selects less loaded stream for read operation over multiple streams.
void AdjustQueues(uint16_t size)
void MsgReceived(uint16_t substrm)
uint16_t Select(const std::vector< bool > &connected)
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
Information holder for xrootd channels.
std::vector< XRootDStreamInfo > StreamInfoVector
std::set< uint16_t > sentCloses
std::unique_ptr< StreamSelector > strmSelector
std::unique_ptr< BindPrefSelector > bindSelector
std::atomic< uint32_t > finstcnt
ServerResponseBody_Protocol * protRespBody
std::set< uint16_t > sentOpens
std::shared_ptr< SIDManager > sidManager
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted
Information holder for XRootDStreams.
Generic structure to pass security information back and forth.
char * buffer
Pointer to the buffer.
int size
Size of the buffer or length of data in the buffer.