XRootD
XrdClStream.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdCl/XrdClStream.hh"
26 #include "XrdCl/XrdClSocket.hh"
27 #include "XrdCl/XrdClChannel.hh"
28 #include "XrdCl/XrdClConstants.hh"
29 #include "XrdCl/XrdClLog.hh"
30 #include "XrdCl/XrdClMessage.hh"
31 #include "XrdCl/XrdClDefaultEnv.hh"
32 #include "XrdCl/XrdClUtils.hh"
33 #include "XrdCl/XrdClOutQueue.hh"
34 #include "XrdCl/XrdClMonitor.hh"
39 
40 #include <sys/types.h>
41 #include <algorithm>
42 #include <sys/socket.h>
43 #include <sys/time.h>
44 
45 namespace XrdCl
46 {
47  //----------------------------------------------------------------------------
48  // Statics
49  //----------------------------------------------------------------------------
50  RAtomic_uint64_t Stream::sSessCntGen{0};
51 
52  //----------------------------------------------------------------------------
53  // Incoming message helper
54  //----------------------------------------------------------------------------
56  {
57  InMessageHelper( Message *message = 0,
58  MsgHandler *hndlr = 0,
59  time_t expir = 0,
60  uint16_t actio = 0 ):
61  msg( message ), handler( hndlr ), expires( expir ), action( actio ) {}
62  void Reset()
63  {
64  msg = 0; handler = 0; expires = 0; action = 0;
65  }
68  time_t expires;
69  uint16_t action;
70  };
71 
72  //----------------------------------------------------------------------------
73  // Sub stream helper
74  //----------------------------------------------------------------------------
76  {
77  SubStreamData(): socket( 0 ), status( Socket::Disconnected )
78  {
79  outQueue = new OutQueue();
80  }
82  {
83  delete socket;
84  delete outQueue;
85  }
91  };
92 
93  //----------------------------------------------------------------------------
94  // Constructor
95  //----------------------------------------------------------------------------
96  Stream::Stream( const URL *url, const URL &prefer ):
97  pUrl( url ),
98  pPrefer( prefer ),
99  pTransport( 0 ),
100  pPoller( 0 ),
101  pTaskManager( 0 ),
102  pJobManager( 0 ),
103  pIncomingQueue( 0 ),
104  pChannelData( 0 ),
105  pLastStreamError( 0 ),
106  pConnectionCount( 0 ),
107  pConnectionInitTime( 0 ),
108  pAddressType( Utils::IPAll ),
109  pSessionId( 0 ),
110  pBytesSent( 0 ),
111  pBytesReceived( 0 )
112  {
113  pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114  pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115 
116  std::ostringstream o;
117  o << pUrl->GetHostId();
118  pStreamName = o.str();
119 
120  pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122  pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124  pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126 
127  std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129 
130  pAddressType = Utils::String2AddressType( netStack );
131  if( pAddressType == Utils::AddressType::IPAuto )
132  {
133  XrdNetUtils::NetProt stacks = XrdNetUtils::NetConfig( XrdNetUtils::NetType::qryINIF );
134  if( !( stacks & XrdNetUtils::hasIP64 ) )
135  {
136  if( stacks & XrdNetUtils::hasIPv4 )
137  pAddressType = Utils::AddressType::IPv4;
138  else if( stacks & XrdNetUtils::hasIPv6 )
139  pAddressType = Utils::AddressType::IPv6;
140  }
141  }
142 
143  Log *log = DefaultEnv::GetLog();
144  log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145  "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146  "Window: %d", pStreamName.c_str(), netStack.c_str(),
147  pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148  }
149 
150  //----------------------------------------------------------------------------
151  // Destructor
152  //----------------------------------------------------------------------------
154  {
155  Disconnect( true );
156 
157  Log *log = DefaultEnv::GetLog();
158  log->Debug( PostMasterMsg, "[%s] Destroying stream",
159  pStreamName.c_str() );
160 
161  MonitorDisconnection( XRootDStatus() );
162 
163  SubStreamList::iterator it;
164  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165  delete *it;
166  }
167 
168  //----------------------------------------------------------------------------
169  // Initializer
170  //----------------------------------------------------------------------------
172  {
173  if( !pTransport || !pPoller || !pChannelData )
175 
176  AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177  pChannelData, 0, this );
178  pSubStreams.push_back( new SubStreamData() );
179  pSubStreams[0]->socket = s;
180  return XRootDStatus();
181  }
182 
183  //------------------------------------------------------------------------
184  // Make sure that the underlying socket handler gets write readiness
185  // events
186  //------------------------------------------------------------------------
188  {
189  XrdSysMutexHelper scopedLock( pMutex );
190 
191  //--------------------------------------------------------------------------
192  // We are in the process of connecting the main stream, so we do nothing
193  // because when the main stream connection is established it will connect
194  // all the other streams
195  //--------------------------------------------------------------------------
196  if( pSubStreams[0]->status == Socket::Connecting )
197  return XRootDStatus();
198 
199  //--------------------------------------------------------------------------
200  // The main stream is connected, so we can verify whether we have
201  // the up and the down stream connected and ready to handle data.
202  // If anything is not right we fall back to stream 0.
203  //--------------------------------------------------------------------------
204  if( pSubStreams[0]->status == Socket::Connected )
205  {
206  if( pSubStreams[path.down]->status != Socket::Connected )
207  path.down = 0;
208 
209  if( pSubStreams[path.up]->status == Socket::Disconnected )
210  {
211  path.up = 0;
212  return pSubStreams[0]->socket->EnableUplink();
213  }
214 
215  if( pSubStreams[path.up]->status == Socket::Connected )
216  return pSubStreams[path.up]->socket->EnableUplink();
217 
218  return XRootDStatus();
219  }
220 
221  //--------------------------------------------------------------------------
222  // The main stream is not connected, we need to check whether enough time
223  // has passed since we last encountered an error (if any) so that we could
224  // re-attempt the connection
225  //--------------------------------------------------------------------------
226  Log *log = DefaultEnv::GetLog();
227  time_t now = ::time(0);
228 
229  if( now-pLastStreamError < pStreamErrorWindow )
230  return pLastFatalError;
231 
232  gettimeofday( &pConnectionStarted, 0 );
233  ++pConnectionCount;
234 
235  //--------------------------------------------------------------------------
236  // Resolve all the addresses of the host we're supposed to connect to
237  //--------------------------------------------------------------------------
238  XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239  if( !st.IsOK() )
240  {
241  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242  "the host", pStreamName.c_str() );
243  pLastStreamError = now;
244  st.status = stFatal;
245  pLastFatalError = st;
246  return st;
247  }
248 
249  if( pPrefer.IsValid() )
250  {
251  std::vector<XrdNetAddr> addrresses;
252  XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253  if( !st.IsOK() )
254  {
255  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256  pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257  }
258  else
259  {
260  std::vector<XrdNetAddr> tmp;
261  tmp.reserve( pAddresses.size() );
262  // first add all remaining addresses
263  auto itr = pAddresses.begin();
264  for( ; itr != pAddresses.end() ; ++itr )
265  {
266  if( !HasNetAddr( *itr, addrresses ) )
267  tmp.push_back( *itr );
268  }
269  // then copy all 'preferred' addresses
270  std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271  // and keep the result
272  pAddresses.swap( tmp );
273  }
274  }
275 
277  pAddresses );
278 
279  while( !pAddresses.empty() )
280  {
281  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282  pAddresses.pop_back();
283  pConnectionInitTime = ::time( 0 );
284  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285  if( st.IsOK() )
286  {
287  pSubStreams[0]->status = Socket::Connecting;
288  break;
289  }
290  }
291  return st;
292  }
293 
294  //----------------------------------------------------------------------------
295  // Queue the message for sending
296  //----------------------------------------------------------------------------
298  MsgHandler *handler,
299  bool stateful,
300  time_t expires )
301  {
302  XrdSysMutexHelper scopedLock( pMutex );
303  Log *log = DefaultEnv::GetLog();
304 
305  //--------------------------------------------------------------------------
306  // Check the session ID and bounce if needed
307  //--------------------------------------------------------------------------
308  if( msg->GetSessionId() &&
309  (pSubStreams[0]->status != Socket::Connected ||
310  pSessionId != msg->GetSessionId()) )
312 
313  //--------------------------------------------------------------------------
314  // Decide on the path to send the message
315  //--------------------------------------------------------------------------
316  PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317  if( pSubStreams.size() <= path.up )
318  {
319  log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320  "substream %d, using 0 instead", pStreamName.c_str(),
321  msg->GetObfuscatedDescription().c_str(), path.up );
322  path.up = 0;
323  }
324 
325  log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
326  "substream %d expecting answer at %d", pStreamName.c_str(),
327  msg->GetObfuscatedDescription().c_str(), msg, path.up, path.down );
328 
329  //--------------------------------------------------------------------------
330  // Enable *a* path and insert the message to the right queue
331  //--------------------------------------------------------------------------
332  XRootDStatus st = EnableLink( path );
333  if( st.IsOK() )
334  {
335  pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336  pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337  expires, stateful );
338  }
339  else
340  st.status = stFatal;
341  return st;
342  }
343 
344  //----------------------------------------------------------------------------
345  // Force connection
346  //----------------------------------------------------------------------------
348  {
349  XrdSysMutexHelper scopedLock( pMutex );
350  if( pSubStreams[0]->status == Socket::Connecting )
351  {
352  pSubStreams[0]->status = Socket::Disconnected;
353  XrdCl::PathID path( 0, 0 );
354  XrdCl::XRootDStatus st = EnableLink( path );
355  if( !st.IsOK() )
356  OnConnectError( 0, st );
357  }
358  }
359 
360  //----------------------------------------------------------------------------
361  // Disconnect the stream
362  //----------------------------------------------------------------------------
363  void Stream::Disconnect( bool /*force*/ )
364  {
365  XrdSysMutexHelper scopedLock( pMutex );
366  SubStreamList::iterator it;
367  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368  {
369  (*it)->socket->Close();
370  (*it)->status = Socket::Disconnected;
371  }
372  }
373 
374  //----------------------------------------------------------------------------
375  // Handle a clock event
376  //----------------------------------------------------------------------------
377  void Stream::Tick( time_t now )
378  {
379  //--------------------------------------------------------------------------
380  // Check for timed-out requests and incoming handlers
381  //--------------------------------------------------------------------------
382  pMutex.Lock();
383  OutQueue q;
384  SubStreamList::iterator it;
385  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386  q.GrabExpired( *(*it)->outQueue, now );
387  pMutex.UnLock();
388 
390  pIncomingQueue->ReportTimeout( now );
391  }
392 }
393 
394 //------------------------------------------------------------------------------
395 // Handle message timeouts and reconnection in the future
396 //------------------------------------------------------------------------------
397 namespace
398 {
399  class StreamConnectorTask: public XrdCl::Task
400  {
401  public:
402  //------------------------------------------------------------------------
403  // Constructor
404  //------------------------------------------------------------------------
405  StreamConnectorTask( const XrdCl::URL &url, const std::string &n ):
406  url( url )
407  {
408  std::string name = "StreamConnectorTask for ";
409  name += n;
410  SetName( name );
411  }
412 
413  //------------------------------------------------------------------------
414  // Run the task
415  //------------------------------------------------------------------------
416  time_t Run( time_t )
417  {
419  return 0;
420  }
421 
422  private:
423  XrdCl::URL url;
424  };
425 }
426 
427 namespace XrdCl
428 {
429  XRootDStatus Stream::RequestClose( Message &response )
430  {
431  ServerResponse *rsp = reinterpret_cast<ServerResponse*>( response.GetBuffer() );
432  if( rsp->hdr.dlen < 4 ) return XRootDStatus( stError );
433  Message *msg;
434  ClientCloseRequest *req;
435  MessageUtils::CreateRequest( msg, req );
436  req->requestid = kXR_close;
437  memcpy( req->fhandle, reinterpret_cast<uint8_t*>( rsp->body.buffer.data ), 4 );
439  msg->SetSessionId( pSessionId );
440  NullResponseHandler *handler = new NullResponseHandler();
441  MessageSendParams params;
442  params.timeout = 0;
443  params.followRedirects = false;
444  params.stateful = true;
446  return MessageUtils::SendMessage( *pUrl, msg, handler, params, 0 );
447  }
448 
449  //------------------------------------------------------------------------
450  // Check if message is a partial response
451  //------------------------------------------------------------------------
452  bool Stream::IsPartial( Message &msg )
453  {
454  ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
455  if( rsphdr->status == kXR_oksofar )
456  return true;
457 
458  if( rsphdr->status == kXR_status )
459  {
460  ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
462  return true;
463  }
464 
465  return false;
466  }
467 
468  //----------------------------------------------------------------------------
469  // Call back when a message has been reconstructed
470  //----------------------------------------------------------------------------
471  void Stream::OnIncoming( uint16_t subStream,
472  std::shared_ptr<Message> msg,
473  uint32_t bytesReceived )
474  {
475  msg->SetSessionId( pSessionId );
476  pBytesReceived += bytesReceived;
477 
478  MsgHandler *handler = nullptr;
479  uint16_t action = 0;
480  {
481  InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482  handler = mh.handler;
483  action = mh.action;
484  mh.Reset();
485  }
486 
487  if( !IsPartial( *msg ) )
488  {
489  uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490  *pChannelData );
491  if( streamAction & TransportHandler::DigestMsg )
492  return;
493 
494  if( streamAction & TransportHandler::RequestClose )
495  {
496  RequestClose( *msg );
497  return;
498  }
499  }
500 
501  Log *log = DefaultEnv::GetLog();
502 
503  //--------------------------------------------------------------------------
504  // No handler, we discard the message ...
505  //--------------------------------------------------------------------------
506  if( !handler )
507  {
508  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509  log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
510  "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511  pStreamName.c_str(), msg.get(), rsp->hdr.status,
512  rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513  return;
514  }
515 
516  //--------------------------------------------------------------------------
517  // We have a handler, so we call the callback
518  //--------------------------------------------------------------------------
519  log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
520  pStreamName.c_str(), msg.get() );
521 
523  {
524  log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
525  pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
526 
527  // if we are handling partial response we have to take down the timeout fence
528  if( IsPartial( *msg ) )
529  {
530  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531  if( xrdHandler ) xrdHandler->PartialReceived();
532  }
533 
534  return;
535  }
536 
537  Job *job = new HandleIncMsgJob( handler );
538  pJobManager->QueueJob( job );
539  }
540 
541  //----------------------------------------------------------------------------
542  // Call when one of the sockets is ready to accept a new message
543  //----------------------------------------------------------------------------
544  std::pair<Message *, MsgHandler *>
545  Stream::OnReadyToWrite( uint16_t subStream )
546  {
547  XrdSysMutexHelper scopedLock( pMutex );
548  Log *log = DefaultEnv::GetLog();
549  if( pSubStreams[subStream]->outQueue->IsEmpty() )
550  {
551  log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552  pSubStreams[subStream]->socket->GetStreamName().c_str() );
553 
554  pSubStreams[subStream]->socket->DisableUplink();
555  return std::make_pair( (Message *)0, (MsgHandler *)0 );
556  }
557 
558  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559  h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560  h.expires,
561  h.stateful );
562 
563  log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
564  "from out-queue to in-queue, starting to send outgoing.",
565  pUrl->GetHostId().c_str(), h.handler,
566  h.msg->GetObfuscatedDescription().c_str() );
567 
568  scopedLock.UnLock();
569 
570  if( h.handler )
571  {
572  bool rmMsg = false;
573  pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
574  if( rmMsg )
575  {
576  Log *log = DefaultEnv::GetLog();
577  log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
578  pStreamName.c_str() );
579  }
580  h.handler->OnReadyToSend( h.msg );
581  }
582  return std::make_pair( h.msg, h.handler );
583  }
584 
585  void Stream::DisableIfEmpty( uint16_t subStream )
586  {
587  XrdSysMutexHelper scopedLock( pMutex );
588  Log *log = DefaultEnv::GetLog();
589 
590  if( pSubStreams[subStream]->outQueue->IsEmpty() )
591  {
592  log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
593  pSubStreams[subStream]->socket->GetStreamName().c_str() );
594  pSubStreams[subStream]->socket->DisableUplink();
595  }
596  }
597 
598  //----------------------------------------------------------------------------
599  // Call when a message is written to the socket
600  //----------------------------------------------------------------------------
601  void Stream::OnMessageSent( uint16_t subStream,
602  Message *msg,
603  uint32_t bytesSent )
604  {
605  pTransport->MessageSent( msg, subStream, bytesSent,
606  *pChannelData );
607  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
608  pBytesSent += bytesSent;
609  if( h.handler )
610  {
611  // ensure expiration time is assigned if still in queue
612  pIncomingQueue->AssignTimeout( h.handler );
613  // OnStatusReady may cause the handler to delete itself, in
614  // which case the handler or the user callback may also delete msg
615  h.handler->OnStatusReady( msg, XRootDStatus() );
616  }
617  pSubStreams[subStream]->outMsgHelper.Reset();
618  }
619 
620  //----------------------------------------------------------------------------
621  // Call back when a message has been reconstructed
622  //----------------------------------------------------------------------------
623  void Stream::OnConnect( uint16_t subStream )
624  {
625  XrdSysMutexHelper scopedLock( pMutex );
626  pSubStreams[subStream]->status = Socket::Connected;
627 
628  std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
629  Log *log = DefaultEnv::GetLog();
630  log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
631  subStream, ipstack.c_str() );
632 
633  if( subStream == 0 )
634  {
635  pLastStreamError = 0;
636  pLastFatalError = XRootDStatus();
637  pConnectionCount = 0;
638  uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
639  pSessionId = ++sSessCntGen;
640 
641  //------------------------------------------------------------------------
642  // Create the streams if they don't exist yet
643  //------------------------------------------------------------------------
644  if( pSubStreams.size() == 1 && numSub > 1 )
645  {
646  for( uint16_t i = 1; i < numSub; ++i )
647  {
648  URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
649  AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
650  pChannelData, i, this );
651  pSubStreams.push_back( new SubStreamData() );
652  pSubStreams[i]->socket = s;
653  }
654  }
655 
656  //------------------------------------------------------------------------
657  // Connect the extra streams, if we fail we move all the outgoing items
658  // to stream 0, we don't need to enable the uplink here, because it
659  // should be already enabled after the handshaking process is completed.
660  //------------------------------------------------------------------------
661  if( pSubStreams.size() > 1 )
662  {
663  log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
664  pStreamName.c_str(), pSubStreams.size() - 1 );
665  for( size_t i = 1; i < pSubStreams.size(); ++i )
666  {
667  pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
668  XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
669  if( !st.IsOK() )
670  {
671  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
672  pSubStreams[i]->socket->Close();
673  }
674  else
675  {
676  pSubStreams[i]->status = Socket::Connecting;
677  }
678  }
679  }
680 
681  //------------------------------------------------------------------------
682  // Inform monitoring
683  //------------------------------------------------------------------------
684  pBytesSent = 0;
685  pBytesReceived = 0;
686  gettimeofday( &pConnectionDone, 0 );
688  if( mon )
689  {
691  i.server = pUrl->GetHostId();
692  i.sTOD = pConnectionStarted;
693  i.eTOD = pConnectionDone;
694  i.streams = pSubStreams.size();
695 
696  AnyObject qryResult;
697  std::string *qryResponse = 0;
698  pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
699  qryResult.Get( qryResponse );
700  i.auth = *qryResponse;
701  delete qryResponse;
702  mon->Event( Monitor::EvConnect, &i );
703  }
704 
705  //------------------------------------------------------------------------
706  // For every connected control-stream call the global on-connect handler
707  //------------------------------------------------------------------------
709  }
710  else if( pOnDataConnJob )
711  {
712  //------------------------------------------------------------------------
713  // For every connected data-stream call the on-connect handler
714  //------------------------------------------------------------------------
715  pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
716  }
717  }
718 
719  //----------------------------------------------------------------------------
720  // On connect error
721  //----------------------------------------------------------------------------
722  void Stream::OnConnectError( uint16_t subStream, XRootDStatus status )
723  {
724  XrdSysMutexHelper scopedLock( pMutex );
725  Log *log = DefaultEnv::GetLog();
726  pSubStreams[subStream]->socket->Close();
727  time_t now = ::time(0);
728 
729  //--------------------------------------------------------------------------
730  // For every connection error call the global connection error handler
731  //--------------------------------------------------------------------------
733 
734  //--------------------------------------------------------------------------
735  // If we connected subStream == 0 and cannot connect >0 then we just give
736  // up and move the outgoing messages to another queue
737  //--------------------------------------------------------------------------
738  if( subStream > 0 )
739  {
740  pSubStreams[subStream]->status = Socket::Disconnected;
741  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
742  if( pSubStreams[0]->status == Socket::Connected )
743  {
744  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
745  if( !st.IsOK() )
746  OnFatalError( 0, st, scopedLock );
747  return;
748  }
749 
750  if( pSubStreams[0]->status == Socket::Connecting )
751  return;
752 
753  OnFatalError( subStream, status, scopedLock );
754  return;
755  }
756 
757  //--------------------------------------------------------------------------
758  // Check if we still have time to try and do something in the current window
759  //--------------------------------------------------------------------------
760  time_t elapsed = now-pConnectionInitTime;
761  log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
762  pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
763 
764  //------------------------------------------------------------------------
765  // If we have some IP addresses left we try them
766  //------------------------------------------------------------------------
767  if( !pAddresses.empty() )
768  {
769  XRootDStatus st;
770  do
771  {
772  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
773  pAddresses.pop_back();
774  pConnectionInitTime = ::time( 0 );
775  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
776  }
777  while( !pAddresses.empty() && !st.IsOK() );
778 
779  if( !st.IsOK() )
780  OnFatalError( subStream, st, scopedLock );
781 
782  return;
783  }
784  //------------------------------------------------------------------------
785  // If we still can retry with the same host name, we sleep until the end
786  // of the connection window and try
787  //------------------------------------------------------------------------
788  else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
789  && !status.IsFatal() )
790  {
791  log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
792  pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
793 
794  Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
795  pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
796  return;
797  }
798  //--------------------------------------------------------------------------
799  // We are out of the connection window, the only thing we can do here
800  // is re-resolving the host name and retrying if we still can
801  //--------------------------------------------------------------------------
802  else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
803  {
804  pAddresses.clear();
805  pSubStreams[0]->status = Socket::Disconnected;
806  PathID path( 0, 0 );
807  XRootDStatus st = EnableLink( path );
808  if( !st.IsOK() )
809  OnFatalError( subStream, st, scopedLock );
810  return;
811  }
812 
813  //--------------------------------------------------------------------------
814  // Else, we fail
815  //--------------------------------------------------------------------------
816  OnFatalError( subStream, status, scopedLock );
817  }
818 
819  //----------------------------------------------------------------------------
820  // Call back when an error has occurred
821  //----------------------------------------------------------------------------
822  void Stream::OnError( uint16_t subStream, XRootDStatus status )
823  {
824  XrdSysMutexHelper scopedLock( pMutex );
825  Log *log = DefaultEnv::GetLog();
826  pSubStreams[subStream]->socket->Close();
827  pSubStreams[subStream]->status = Socket::Disconnected;
828 
829  log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
830  pStreamName.c_str(), subStream, status.ToString().c_str() );
831 
832  //--------------------------------------------------------------------------
833  // Reinsert the stuff that we have failed to sent
834  //--------------------------------------------------------------------------
835  if( pSubStreams[subStream]->outMsgHelper.msg )
836  {
837  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
838  pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
839  h.stateful );
840  pIncomingQueue->RemoveMessageHandler(h.handler);
841  pSubStreams[subStream]->outMsgHelper.Reset();
842  }
843 
844  //--------------------------------------------------------------------------
845  // Reinsert the receiving handler and reset any partially read partial
846  //--------------------------------------------------------------------------
847  if( pSubStreams[subStream]->inMsgHelper.handler )
848  {
849  InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
850  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
851  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
852  if( xrdHandler ) xrdHandler->PartialReceived();
853  h.Reset();
854  }
855 
856  //--------------------------------------------------------------------------
857  // We are dealing with an error of a peripheral stream. If we don't have
858  // anything to send don't bother recovering. Otherwise move the requests
859  // to stream 0 if possible.
860  //--------------------------------------------------------------------------
861  if( subStream > 0 )
862  {
863  if( pSubStreams[subStream]->outQueue->IsEmpty() )
864  return;
865 
866  if( pSubStreams[0]->status != Socket::Disconnected )
867  {
868  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
869  if( pSubStreams[0]->status == Socket::Connected )
870  {
871  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
872  if( !st.IsOK() )
873  OnFatalError( 0, st, scopedLock );
874  return;
875  }
876  }
877  OnFatalError( subStream, status, scopedLock );
878  return;
879  }
880 
881  //--------------------------------------------------------------------------
882  // If we lost the stream 0 we have lost the session, we re-enable the
883  // stream if we still have things in one of the outgoing queues, otherwise
884  // there is not point to recover at this point.
885  //--------------------------------------------------------------------------
886  if( subStream == 0 )
887  {
888  MonitorDisconnection( status );
889 
890  SubStreamList::iterator it;
891  size_t outstanding = 0;
892  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
893  outstanding += (*it)->outQueue->GetSizeStateless();
894 
895  if( outstanding )
896  {
897  PathID path( 0, 0 );
898  XRootDStatus st = EnableLink( path );
899  if( !st.IsOK() )
900  {
901  OnFatalError( 0, st, scopedLock );
902  return;
903  }
904  }
905 
906  //------------------------------------------------------------------------
907  // We're done here, unlock the stream mutex to avoid deadlocks and
908  // report the disconnection event to the handlers
909  //------------------------------------------------------------------------
910  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
911  "message handlers.", pStreamName.c_str() );
912  OutQueue q;
913  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
914  q.GrabStateful( *(*it)->outQueue );
915  scopedLock.UnLock();
916 
917  q.Report( status );
918  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
919  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
920  return;
921  }
922  }
923 
924  //------------------------------------------------------------------------
925  // Force error
926  //------------------------------------------------------------------------
927  void Stream::ForceError( XRootDStatus status, bool hush )
928  {
929  XrdSysMutexHelper scopedLock( pMutex );
930  Log *log = DefaultEnv::GetLog();
931  for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
932  {
933  if( pSubStreams[substream]->status != Socket::Connected ) continue;
934  pSubStreams[substream]->socket->Close();
935  pSubStreams[substream]->status = Socket::Disconnected;
936 
937  if( !hush )
938  log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
939  pStreamName.c_str(), status.ToString().c_str() );
940 
941  //--------------------------------------------------------------------
942  // Reinsert the stuff that we have failed to sent
943  //--------------------------------------------------------------------
944  if( pSubStreams[substream]->outMsgHelper.msg )
945  {
946  OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
947  pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
948  h.stateful );
949  pIncomingQueue->RemoveMessageHandler(h.handler);
950  pSubStreams[substream]->outMsgHelper.Reset();
951  }
952 
953  //--------------------------------------------------------------------
954  // Reinsert the receiving handler and reset any partially read partial
955  //--------------------------------------------------------------------
956  if( pSubStreams[substream]->inMsgHelper.handler )
957  {
958  InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
959  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
960  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
961  if( xrdHandler ) xrdHandler->PartialReceived();
962  h.Reset();
963  }
964  }
965 
966  pConnectionCount = 0;
967 
968  //------------------------------------------------------------------------
969  // We're done here, unlock the stream mutex to avoid deadlocks and
970  // report the disconnection event to the handlers
971  //------------------------------------------------------------------------
972  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
973  "message handlers.", pStreamName.c_str() );
974 
975  SubStreamList::iterator it;
976  OutQueue q;
977  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
978  q.GrabItems( *(*it)->outQueue );
979  scopedLock.UnLock();
980 
981  q.Report( status );
982 
983  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
984  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
985  }
986 
987  //----------------------------------------------------------------------------
988  // On fatal error
989  //----------------------------------------------------------------------------
990  void Stream::OnFatalError( uint16_t subStream,
991  XRootDStatus status,
992  XrdSysMutexHelper &lock )
993  {
994  Log *log = DefaultEnv::GetLog();
995  pSubStreams[subStream]->status = Socket::Disconnected;
996  log->Error( PostMasterMsg, "[%s] Unable to recover: %s.",
997  pStreamName.c_str(), status.ToString().c_str() );
998 
999  //--------------------------------------------------------------------------
1000  // Don't set the stream error windows for authentication errors as the user
1001  // may refresh his credential at any time
1002  //--------------------------------------------------------------------------
1003  if( status.code != errAuthFailed )
1004  {
1005  pConnectionCount = 0;
1006  pLastStreamError = ::time(0);
1007  pLastFatalError = status;
1008  }
1009 
1010  SubStreamList::iterator it;
1011  OutQueue q;
1012  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1013  q.GrabItems( *(*it)->outQueue );
1014  lock.UnLock();
1015 
1016  status.status = stFatal;
1017  q.Report( status );
1018  pIncomingQueue->ReportStreamEvent( MsgHandler::FatalError, status );
1019  pChannelEvHandlers.ReportEvent( ChannelEventHandler::FatalError, status );
1020 
1021  }
1022 
1023  //----------------------------------------------------------------------------
1024  // Inform monitoring about disconnection
1025  //----------------------------------------------------------------------------
1026  void Stream::MonitorDisconnection( XRootDStatus status )
1027  {
1028  Monitor *mon = DefaultEnv::GetMonitor();
1029  if( mon )
1030  {
1031  Monitor::DisconnectInfo i;
1032  i.server = pUrl->GetHostId();
1033  i.rBytes = pBytesReceived;
1034  i.sBytes = pBytesSent;
1035  i.cTime = ::time(0) - pConnectionDone.tv_sec;
1036  i.status = status;
1037  mon->Event( Monitor::EvDisconnect, &i );
1038  }
1039  }
1040 
1041  //----------------------------------------------------------------------------
1042  // Call back when a message has been reconstructed
1043  //----------------------------------------------------------------------------
1044  bool Stream::OnReadTimeout( uint16_t substream )
1045  {
1046  //--------------------------------------------------------------------------
1047  // We only take the main stream into account
1048  //--------------------------------------------------------------------------
1049  if( substream != 0 )
1050  return true;
1051 
1052  //--------------------------------------------------------------------------
1053  // Check if there is no outgoing messages and if the stream TTL is elapesed.
1054  // It is assumed that the underlying transport makes sure that there is no
1055  // pending requests that are not answered, ie. all possible virtual streams
1056  // are de-allocated
1057  //--------------------------------------------------------------------------
1058  Log *log = DefaultEnv::GetLog();
1059  SubStreamList::iterator it;
1060  time_t now = time(0);
1061 
1062  XrdSysMutexHelper scopedLock( pMutex );
1063  uint32_t outgoingMessages = 0;
1064  time_t lastActivity = 0;
1065  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1066  {
1067  outgoingMessages += (*it)->outQueue->GetSize();
1068  time_t sockLastActivity = (*it)->socket->GetLastActivity();
1069  if( lastActivity < sockLastActivity )
1070  lastActivity = sockLastActivity;
1071  }
1072 
1073  if( !outgoingMessages )
1074  {
1075  bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1076  *pChannelData );
1077  if( disconnect )
1078  {
1079  log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1080  pStreamName.c_str() );
1081  scopedLock.UnLock();
1082  //----------------------------------------------------------------------
1083  // Important note!
1084  //
1085  // This destroys the Stream object itself, the underlined
1086  // AsyncSocketHandler object (that called this method) and the Channel
1087  // object that aggregates this Stream.
1088  //----------------------------------------------------------------------
1090  return false;
1091  }
1092  }
1093 
1094  //--------------------------------------------------------------------------
1095  // Check if the stream is broken
1096  //--------------------------------------------------------------------------
1097  XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1098  *pChannelData );
1099  if( !st.IsOK() )
1100  {
1101  scopedLock.UnLock();
1102  OnError( substream, st );
1103  return false;
1104  }
1105  return true;
1106  }
1107 
1108  //----------------------------------------------------------------------------
1109  // Call back when a message has been reconstru
1110  //----------------------------------------------------------------------------
1111  bool Stream::OnWriteTimeout( uint16_t /*substream*/ )
1112  {
1113  return true;
1114  }
1115 
1116  //----------------------------------------------------------------------------
1117  // Register channel event handler
1118  //----------------------------------------------------------------------------
1120  {
1121  pChannelEvHandlers.AddHandler( handler );
1122  }
1123 
1124  //----------------------------------------------------------------------------
1125  // Remove a channel event handler
1126  //----------------------------------------------------------------------------
1128  {
1129  pChannelEvHandlers.RemoveHandler( handler );
1130  }
1131 
1132  //----------------------------------------------------------------------------
1133  // Install a incoming message handler
1134  //----------------------------------------------------------------------------
1135  MsgHandler*
1136  Stream::InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream )
1137  {
1138  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1139  if( !mh.handler )
1140  mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1141  mh.expires,
1142  mh.action );
1143 
1144  if( !mh.handler )
1145  return nullptr;
1146 
1147  if( mh.action & MsgHandler::Raw )
1148  return mh.handler;
1149  return nullptr;
1150  }
1151 
1152  //----------------------------------------------------------------------------
1156  //----------------------------------------------------------------------------
1157  uint16_t Stream::InspectStatusRsp( uint16_t stream,
1158  MsgHandler *&incHandler )
1159  {
1160  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1161  if( !mh.handler )
1163 
1164  uint16_t action = mh.handler->InspectStatusRsp();
1165  mh.action |= action;
1166 
1167  if( action & MsgHandler::RemoveHandler )
1168  pIncomingQueue->RemoveMessageHandler( mh.handler );
1169 
1170  if( action & MsgHandler::Raw )
1171  {
1172  incHandler = mh.handler;
1173  return MsgHandler::Raw;
1174  }
1175 
1176  if( action & MsgHandler::Corrupted )
1177  return MsgHandler::Corrupted;
1178 
1179  if( action & MsgHandler::More )
1180  return MsgHandler::More;
1181 
1182  return MsgHandler::None;
1183  }
1184 
1185  //----------------------------------------------------------------------------
1186  // Check if channel can be collapsed using given URL
1187  //----------------------------------------------------------------------------
1188  bool Stream::CanCollapse( const URL &url )
1189  {
1190  Log *log = DefaultEnv::GetLog();
1191 
1192  //--------------------------------------------------------------------------
1193  // Resolve all the addresses of the host we're supposed to connect to
1194  //--------------------------------------------------------------------------
1195  std::vector<XrdNetAddr> prefaddrs;
1196  XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1197  if( !st.IsOK() )
1198  {
1199  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1200  , pStreamName.c_str(), url.GetHostName().c_str() );
1201  return false;
1202  }
1203 
1204  //--------------------------------------------------------------------------
1205  // Resolve all the addresses of the alias
1206  //--------------------------------------------------------------------------
1207  std::vector<XrdNetAddr> aliasaddrs;
1208  st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1209  if( !st.IsOK() )
1210  {
1211  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1212  , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1213  return false;
1214  }
1215 
1216  //--------------------------------------------------------------------------
1217  // Now check if the preferred host is part of the alias
1218  //--------------------------------------------------------------------------
1219  auto itr = prefaddrs.begin();
1220  for( ; itr != prefaddrs.end() ; ++itr )
1221  {
1222  auto itr2 = aliasaddrs.begin();
1223  for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1224  if( itr->Same( &*itr2 ) ) return true;
1225  }
1226 
1227  return false;
1228  }
1229 
1230  //------------------------------------------------------------------------
1231  // Query the stream
1232  //------------------------------------------------------------------------
1233  Status Stream::Query( uint16_t query, AnyObject &result )
1234  {
1235  switch( query )
1236  {
1237  case StreamQuery::IpAddr:
1238  {
1239  result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1240  return Status();
1241  }
1242 
1243  case StreamQuery::IpStack:
1244  {
1245  result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1246  return Status();
1247  }
1248 
1249  case StreamQuery::HostName:
1250  {
1251  result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1252  return Status();
1253  }
1254 
1255  default:
1256  return Status( stError, errQueryNotSupported );
1257  }
1258  }
1259 
1260 }
union ServerResponse::@0 body
kXR_char streamid[2]
Definition: XProtocol.hh:914
kXR_unt16 requestid
Definition: XProtocol.hh:228
@ kXR_oksofar
Definition: XProtocol.hh:900
@ kXR_status
Definition: XProtocol.hh:907
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1262
kXR_char fhandle[4]
Definition: XProtocol.hh:229
@ kXR_close
Definition: XProtocol.hh:115
ServerResponseHeader hdr
Definition: XProtocol.hh:1288
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ FatalError
Stream has been broken and won't be recovered.
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void ReportTimeout(time_t now=0)
Timeout handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66
void AssignTimeout(MsgHandler *handler)
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)
Definition: XrdClInQueue.cc:54
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
Status ForceDisconnect(const URL &url)
Shut down a channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
A network socket.
Definition: XrdClSocket.hh:43
SocketStatus
Status of the socket.
Definition: XrdClSocket.hh:49
@ Disconnected
The socket is disconnected.
Definition: XrdClSocket.hh:50
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
Definition: XrdClStream.cc:297
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
Definition: XrdClStream.cc:347
void ForceError(XRootDStatus status, bool hush=false)
Force error.
Definition: XrdClStream.cc:927
Status Query(uint16_t query, AnyObject &result)
Query the stream.
void Disconnect(bool force=false)
Disconnect the stream.
Definition: XrdClStream.cc:363
XRootDStatus EnableLink(PathID &path)
Definition: XrdClStream.cc:187
Stream(const URL *url, const URL &prefer=URL())
Constructor.
Definition: XrdClStream.cc:96
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:623
void Tick(time_t now)
Definition: XrdClStream.cc:377
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:722
~Stream()
Destructor.
Definition: XrdClStream.cc:153
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
Definition: XrdClStream.cc:585
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
Definition: XrdClStream.cc:601
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:471
void OnError(uint16_t subStream, XRootDStatus status)
On error.
Definition: XrdClStream.cc:822
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
Definition: XrdClStream.cc:545
XRootDStatus Initialize()
Initializer.
Definition: XrdClStream.cc:171
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
Random utilities.
Definition: XrdClUtils.hh:50
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
Definition: XrdClUtils.cc:234
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
Definition: XrdClUtils.cc:140
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
Definition: XrdClUtils.cc:123
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:104
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
Definition: XrdNetUtils.cc:681
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79
const uint16_t errAuthFailed
Definition: XrdClStatus.hh:88
@ kXR_PartialResult
Definition: XProtocol.hh:1251
MsgHandler * handler
Definition: XrdClStream.cc:67
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Definition: XrdClStream.cc:57
Describe a server login event.
Definition: XrdClMonitor.hh:72
std::string server
"user@host:port"
Definition: XrdClMonitor.hh:78
uint16_t streams
Number of streams.
Definition: XrdClMonitor.hh:82
timeval sTOD
gettimeofday() when login started
Definition: XrdClMonitor.hh:80
timeval eTOD
gettimeofday() when login ended
Definition: XrdClMonitor.hh:81
std::string auth
authentication protocol used or empty if none
Definition: XrdClMonitor.hh:79
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
uint16_t status
Status of the execution.
Definition: XrdClStatus.hh:146
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
bool IsFatal() const
Fatal error.
Definition: XrdClStatus.hh:123
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
Definition: XrdClStream.cc:89
AsyncSocketHandler * socket
Definition: XrdClStream.cc:86
OutQueue::MsgHelper outMsgHelper
Definition: XrdClStream.cc:88
Socket::SocketStatus status
Definition: XrdClStream.cc:90
static const uint16_t Auth
Transport name, returns std::string *.