XRootD
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

+ Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...
 

Public Member Functions

 Stream (const URL *url, const URL &prefer=URL())
 Constructor. More...
 
 ~Stream ()
 Destructor. More...
 
bool CanCollapse (const URL &url)
 
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty. More...
 
void Disconnect (bool force=false)
 Disconnect the stream. More...
 
XRootDStatus EnableLink (PathID &path)
 
void ForceConnect ()
 Force connection. More...
 
void ForceError (XRootDStatus status, bool hush=false)
 Force error. More...
 
const std::string & GetName () const
 Return stream name. More...
 
const URLGetURL () const
 Get the URL. More...
 
XRootDStatus Initialize ()
 Initializer. More...
 
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
 
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
 
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed. More...
 
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error. More...
 
void OnError (uint16_t subStream, XRootDStatus status)
 On error. More...
 
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed. More...
 
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
 
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout. More...
 
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
 
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout. More...
 
Status Query (uint16_t query, AnyObject &result)
 Query the stream. More...
 
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler. More...
 
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler. More...
 
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending. More...
 
void SetChannelData (AnyObject *channelData)
 Set the channel data. More...
 
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue. More...
 
void SetJobManager (JobManager *jobManager)
 Set job manager. More...
 
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams. More...
 
void SetPoller (Poller *poller)
 Set the poller. More...
 
void SetTaskManager (TaskManager *taskManager)
 Set task manager. More...
 
void SetTransport (TransportHandler *transport)
 Set the transport. More...
 
void Tick (time_t now)
 

Detailed Description

Stream.

Definition at line 51 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 57 of file XrdClStream.hh.

58  {
59  Disconnected = 0,
60  Connected = 1,
61  Connecting = 2,
62  Error = 3
63  };
@ Disconnected
Not connected.
Definition: XrdClStream.hh:59
@ Error
Broken.
Definition: XrdClStream.hh:62
@ Connected
Connected.
Definition: XrdClStream.hh:60
@ Connecting
In the process of being connected.
Definition: XrdClStream.hh:61

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( const URL url,
const URL prefer = URL() 
)

Constructor.

Definition at line 96 of file XrdClStream.cc.

96  :
97  pUrl( url ),
98  pPrefer( prefer ),
99  pTransport( 0 ),
100  pPoller( 0 ),
101  pTaskManager( 0 ),
102  pJobManager( 0 ),
103  pIncomingQueue( 0 ),
104  pChannelData( 0 ),
105  pLastStreamError( 0 ),
106  pConnectionCount( 0 ),
107  pConnectionInitTime( 0 ),
108  pAddressType( Utils::IPAll ),
109  pSessionId( 0 ),
110  pBytesSent( 0 ),
111  pBytesReceived( 0 )
112  {
113  pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114  pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115 
116  std::ostringstream o;
117  o << pUrl->GetHostId();
118  pStreamName = o.str();
119 
120  pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122  pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124  pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126 
127  std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129 
130  pAddressType = Utils::String2AddressType( netStack );
131  if( pAddressType == Utils::AddressType::IPAuto )
132  {
133  XrdNetUtils::NetProt stacks = XrdNetUtils::NetConfig( XrdNetUtils::NetType::qryINIF );
134  if( !( stacks & XrdNetUtils::hasIP64 ) )
135  {
136  if( stacks & XrdNetUtils::hasIPv4 )
137  pAddressType = Utils::AddressType::IPv4;
138  else if( stacks & XrdNetUtils::hasIPv6 )
139  pAddressType = Utils::AddressType::IPv6;
140  }
141  }
142 
143  Log *log = DefaultEnv::GetLog();
144  log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145  "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146  "Window: %d", pStreamName.c_str(), netStack.c_str(),
147  pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148  }
static Log * GetLog()
Get default log.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
Definition: XrdClUtils.cc:123
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:104
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
Definition: XrdNetUtils.cc:681
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition: XrdConfig.cc:113

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::URL::GetHostId(), XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, and XrdCl::Utils::String2AddressType().

+ Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 153 of file XrdClStream.cc.

154  {
155  Disconnect( true );
156 
157  Log *log = DefaultEnv::GetLog();
158  log->Debug( PostMasterMsg, "[%s] Destroying stream",
159  pStreamName.c_str() );
160 
161  MonitorDisconnection( XRootDStatus() );
162 
163  SubStreamList::iterator it;
164  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165  delete *it;
166  }
void Disconnect(bool force=false)
Disconnect the stream.
Definition: XrdClStream.cc:363

References XrdCl::Log::Debug(), Disconnect(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1188 of file XrdClStream.cc.

1189  {
1190  Log *log = DefaultEnv::GetLog();
1191 
1192  //--------------------------------------------------------------------------
1193  // Resolve all the addresses of the host we're supposed to connect to
1194  //--------------------------------------------------------------------------
1195  std::vector<XrdNetAddr> prefaddrs;
1196  XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1197  if( !st.IsOK() )
1198  {
1199  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1200  , pStreamName.c_str(), url.GetHostName().c_str() );
1201  return false;
1202  }
1203 
1204  //--------------------------------------------------------------------------
1205  // Resolve all the addresses of the alias
1206  //--------------------------------------------------------------------------
1207  std::vector<XrdNetAddr> aliasaddrs;
1208  st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1209  if( !st.IsOK() )
1210  {
1211  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1212  , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1213  return false;
1214  }
1215 
1216  //--------------------------------------------------------------------------
1217  // Now check if the preferred host is part of the alias
1218  //--------------------------------------------------------------------------
1219  auto itr = prefaddrs.begin();
1220  for( ; itr != prefaddrs.end() ; ++itr )
1221  {
1222  auto itr2 = aliasaddrs.begin();
1223  for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1224  if( itr->Same( &*itr2 ) ) return true;
1225  }
1226 
1227  return false;
1228  }
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
Definition: XrdClUtils.cc:140

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::Channel::CanCollapse().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t  subStream)

Disables respective uplink if empty.

Definition at line 585 of file XrdClStream.cc.

586  {
587  XrdSysMutexHelper scopedLock( pMutex );
588  Log *log = DefaultEnv::GetLog();
589 
590  if( pSubStreams[subStream]->outQueue->IsEmpty() )
591  {
592  log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
593  pSubStreams[subStream]->socket->GetStreamName().c_str() );
594  pSubStreams[subStream]->socket->DisableUplink();
595  }
596  }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::AsyncSocketHandler::OnWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Disconnect()

void XrdCl::Stream::Disconnect ( bool  force = false)

Disconnect the stream.

Definition at line 363 of file XrdClStream.cc.

364  {
365  XrdSysMutexHelper scopedLock( pMutex );
366  SubStreamList::iterator it;
367  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368  {
369  (*it)->socket->Close();
370  (*it)->status = Socket::Disconnected;
371  }
372  }
@ Disconnected
The socket is disconnected.
Definition: XrdClSocket.hh:50

References XrdCl::Socket::Disconnected.

Referenced by ~Stream().

+ Here is the caller graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 187 of file XrdClStream.cc.

188  {
189  XrdSysMutexHelper scopedLock( pMutex );
190 
191  //--------------------------------------------------------------------------
192  // We are in the process of connecting the main stream, so we do nothing
193  // because when the main stream connection is established it will connect
194  // all the other streams
195  //--------------------------------------------------------------------------
196  if( pSubStreams[0]->status == Socket::Connecting )
197  return XRootDStatus();
198 
199  //--------------------------------------------------------------------------
200  // The main stream is connected, so we can verify whether we have
201  // the up and the down stream connected and ready to handle data.
202  // If anything is not right we fall back to stream 0.
203  //--------------------------------------------------------------------------
204  if( pSubStreams[0]->status == Socket::Connected )
205  {
206  if( pSubStreams[path.down]->status != Socket::Connected )
207  path.down = 0;
208 
209  if( pSubStreams[path.up]->status == Socket::Disconnected )
210  {
211  path.up = 0;
212  return pSubStreams[0]->socket->EnableUplink();
213  }
214 
215  if( pSubStreams[path.up]->status == Socket::Connected )
216  return pSubStreams[path.up]->socket->EnableUplink();
217 
218  return XRootDStatus();
219  }
220 
221  //--------------------------------------------------------------------------
222  // The main stream is not connected, we need to check whether enough time
223  // has passed since we last encountered an error (if any) so that we could
224  // re-attempt the connection
225  //--------------------------------------------------------------------------
226  Log *log = DefaultEnv::GetLog();
227  time_t now = ::time(0);
228 
229  if( now-pLastStreamError < pStreamErrorWindow )
230  return pLastFatalError;
231 
232  gettimeofday( &pConnectionStarted, 0 );
233  ++pConnectionCount;
234 
235  //--------------------------------------------------------------------------
236  // Resolve all the addresses of the host we're supposed to connect to
237  //--------------------------------------------------------------------------
238  XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239  if( !st.IsOK() )
240  {
241  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242  "the host", pStreamName.c_str() );
243  pLastStreamError = now;
244  st.status = stFatal;
245  pLastFatalError = st;
246  return st;
247  }
248 
249  if( pPrefer.IsValid() )
250  {
251  std::vector<XrdNetAddr> addrresses;
252  XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253  if( !st.IsOK() )
254  {
255  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256  pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257  }
258  else
259  {
260  std::vector<XrdNetAddr> tmp;
261  tmp.reserve( pAddresses.size() );
262  // first add all remaining addresses
263  auto itr = pAddresses.begin();
264  for( ; itr != pAddresses.end() ; ++itr )
265  {
266  if( !HasNetAddr( *itr, addrresses ) )
267  tmp.push_back( *itr );
268  }
269  // then copy all 'preferred' addresses
270  std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271  // and keep the result
272  pAddresses.swap( tmp );
273  }
274  }
275 
277  pAddresses );
278 
279  while( !pAddresses.empty() )
280  {
281  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282  pAddresses.pop_back();
283  pConnectionInitTime = ::time( 0 );
284  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285  if( st.IsOK() )
286  {
287  pSubStreams[0]->status = Socket::Connecting;
288  break;
289  }
290  }
291  return st;
292  }
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
Definition: XrdClUtils.cc:234
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostId(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 347 of file XrdClStream.cc.

348  {
349  XrdSysMutexHelper scopedLock( pMutex );
350  if( pSubStreams[0]->status == Socket::Connecting )
351  {
352  pSubStreams[0]->status = Socket::Disconnected;
353  XrdCl::PathID path( 0, 0 );
354  XrdCl::XRootDStatus st = EnableLink( path );
355  if( !st.IsOK() )
356  OnConnectError( 0, st );
357  }
358  }
XRootDStatus EnableLink(PathID &path)
Definition: XrdClStream.cc:187
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:722
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

Referenced by XrdCl::Channel::ForceReconnect().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus  status,
bool  hush = false 
)

Force error.

Definition at line 927 of file XrdClStream.cc.

928  {
929  XrdSysMutexHelper scopedLock( pMutex );
930  Log *log = DefaultEnv::GetLog();
931  for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
932  {
933  if( pSubStreams[substream]->status != Socket::Connected ) continue;
934  pSubStreams[substream]->socket->Close();
935  pSubStreams[substream]->status = Socket::Disconnected;
936 
937  if( !hush )
938  log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
939  pStreamName.c_str(), status.ToString().c_str() );
940 
941  //--------------------------------------------------------------------
942  // Reinsert the stuff that we have failed to sent
943  //--------------------------------------------------------------------
944  if( pSubStreams[substream]->outMsgHelper.msg )
945  {
946  OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
947  pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
948  h.stateful );
949  pIncomingQueue->RemoveMessageHandler(h.handler);
950  pSubStreams[substream]->outMsgHelper.Reset();
951  }
952 
953  //--------------------------------------------------------------------
954  // Reinsert the receiving handler and reset any partially read partial
955  //--------------------------------------------------------------------
956  if( pSubStreams[substream]->inMsgHelper.handler )
957  {
958  InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
959  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
960  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
961  if( xrdHandler ) xrdHandler->PartialReceived();
962  h.Reset();
963  }
964  }
965 
966  pConnectionCount = 0;
967 
968  //------------------------------------------------------------------------
969  // We're done here, unlock the stream mutex to avoid deadlocks and
970  // report the disconnection event to the handlers
971  //------------------------------------------------------------------------
972  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
973  "message handlers.", pStreamName.c_str() );
974 
975  SubStreamList::iterator it;
976  OutQueue q;
977  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
978  q.GrabItems( *(*it)->outQueue );
979  scopedLock.UnLock();
980 
981  q.Report( status );
982 
983  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
984  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
985  }
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
@ Broken
The stream is broken.

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::InQueue::RemoveMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::Channel::ForceDisconnect(), and XrdCl::AsyncSocketHandler::OnHeaderCorruption().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetName()

const std::string& XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 170 of file XrdClStream.hh.

171  {
172  return pStreamName;
173  }

◆ GetURL()

const URL* XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 157 of file XrdClStream.hh.

158  {
159  return pUrl;
160  }

Referenced by XrdCl::AsyncSocketHandler::OnConnectionReturn().

+ Here is the caller graph for this function:

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 171 of file XrdClStream.cc.

172  {
173  if( !pTransport || !pPoller || !pChannelData )
174  return XRootDStatus( stError, errUninitialized );
175 
176  AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177  pChannelData, 0, this );
178  pSubStreams.push_back( new SubStreamData() );
179  pSubStreams[0]->socket = s;
180  return XRootDStatus();
181  }
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32

References XrdCl::errUninitialized, and XrdCl::stError.

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t  stream,
MsgHandler *&  incHandler 
)

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1157 of file XrdClStream.cc.

1159  {
1160  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1161  if( !mh.handler )
1163 
1164  uint16_t action = mh.handler->InspectStatusRsp();
1165  mh.action |= action;
1166 
1167  if( action & MsgHandler::RemoveHandler )
1168  pIncomingQueue->RemoveMessageHandler( mh.handler );
1169 
1170  if( action & MsgHandler::Raw )
1171  {
1172  incHandler = mh.handler;
1173  return MsgHandler::Raw;
1174  }
1175 
1176  if( action & MsgHandler::Corrupted )
1177  return MsgHandler::Corrupted;
1178 
1179  if( action & MsgHandler::More )
1180  return MsgHandler::More;
1181 
1182  return MsgHandler::None;
1183  }
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, and XrdCl::InQueue::RemoveMessageHandler().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > &  msg,
uint16_t  stream 
)

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1136 of file XrdClStream.cc.

1137  {
1138  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1139  if( !mh.handler )
1140  mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1141  mh.expires,
1142  mh.action );
1143 
1144  if( !mh.handler )
1145  return nullptr;
1146 
1147  if( mh.action & MsgHandler::Raw )
1148  return mh.handler;
1149  return nullptr;
1150  }
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InQueue::GetHandlerForMessage(), XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t  subStream)

Call back when a message has been reconstructed.

Definition at line 623 of file XrdClStream.cc.

624  {
625  XrdSysMutexHelper scopedLock( pMutex );
626  pSubStreams[subStream]->status = Socket::Connected;
627 
628  std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
629  Log *log = DefaultEnv::GetLog();
630  log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
631  subStream, ipstack.c_str() );
632 
633  if( subStream == 0 )
634  {
635  pLastStreamError = 0;
636  pLastFatalError = XRootDStatus();
637  pConnectionCount = 0;
638  uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
639  pSessionId = ++sSessCntGen;
640 
641  //------------------------------------------------------------------------
642  // Create the streams if they don't exist yet
643  //------------------------------------------------------------------------
644  if( pSubStreams.size() == 1 && numSub > 1 )
645  {
646  for( uint16_t i = 1; i < numSub; ++i )
647  {
648  URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
649  AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
650  pChannelData, i, this );
651  pSubStreams.push_back( new SubStreamData() );
652  pSubStreams[i]->socket = s;
653  }
654  }
655 
656  //------------------------------------------------------------------------
657  // Connect the extra streams, if we fail we move all the outgoing items
658  // to stream 0, we don't need to enable the uplink here, because it
659  // should be already enabled after the handshaking process is completed.
660  //------------------------------------------------------------------------
661  if( pSubStreams.size() > 1 )
662  {
663  log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
664  pStreamName.c_str(), pSubStreams.size() - 1 );
665  for( size_t i = 1; i < pSubStreams.size(); ++i )
666  {
667  pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
668  XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
669  if( !st.IsOK() )
670  {
671  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
672  pSubStreams[i]->socket->Close();
673  }
674  else
675  {
676  pSubStreams[i]->status = Socket::Connecting;
677  }
678  }
679  }
680 
681  //------------------------------------------------------------------------
682  // Inform monitoring
683  //------------------------------------------------------------------------
684  pBytesSent = 0;
685  pBytesReceived = 0;
686  gettimeofday( &pConnectionDone, 0 );
687  Monitor *mon = DefaultEnv::GetMonitor();
688  if( mon )
689  {
690  Monitor::ConnectInfo i;
691  i.server = pUrl->GetHostId();
692  i.sTOD = pConnectionStarted;
693  i.eTOD = pConnectionDone;
694  i.streams = pSubStreams.size();
695 
696  AnyObject qryResult;
697  std::string *qryResponse = 0;
698  pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
699  qryResult.Get( qryResponse );
700  i.auth = *qryResponse;
701  delete qryResponse;
702  mon->Event( Monitor::EvConnect, &i );
703  }
704 
705  //------------------------------------------------------------------------
706  // For every connected control-stream call the global on-connect handler
707  //------------------------------------------------------------------------
709  }
710  else if( pOnDataConnJob )
711  {
712  //------------------------------------------------------------------------
713  // For every connected data-stream call the on-connect handler
714  //------------------------------------------------------------------------
715  pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
716  }
717  }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::Monitor::ConnectInfo::auth, XrdCl::TransportQuery::Auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), XrdCl::TransportHandler::GetBindPreference(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::Query(), XrdCl::JobManager::QueueJob(), XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, XrdCl::Monitor::ConnectInfo::streams, and XrdCl::TransportHandler::SubStreamNumber().

Referenced by XrdCl::AsyncSocketHandler::HandShakeNextStep().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t  subStream,
XRootDStatus  status 
)

On connect error.

Definition at line 722 of file XrdClStream.cc.

723  {
724  XrdSysMutexHelper scopedLock( pMutex );
725  Log *log = DefaultEnv::GetLog();
726  pSubStreams[subStream]->socket->Close();
727  time_t now = ::time(0);
728 
729  //--------------------------------------------------------------------------
730  // For every connection error call the global connection error handler
731  //--------------------------------------------------------------------------
733 
734  //--------------------------------------------------------------------------
735  // If we connected subStream == 0 and cannot connect >0 then we just give
736  // up and move the outgoing messages to another queue
737  //--------------------------------------------------------------------------
738  if( subStream > 0 )
739  {
740  pSubStreams[subStream]->status = Socket::Disconnected;
741  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
742  if( pSubStreams[0]->status == Socket::Connected )
743  {
744  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
745  if( !st.IsOK() )
746  OnFatalError( 0, st, scopedLock );
747  return;
748  }
749 
750  if( pSubStreams[0]->status == Socket::Connecting )
751  return;
752 
753  OnFatalError( subStream, status, scopedLock );
754  return;
755  }
756 
757  //--------------------------------------------------------------------------
758  // Check if we still have time to try and do something in the current window
759  //--------------------------------------------------------------------------
760  time_t elapsed = now-pConnectionInitTime;
761  log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
762  pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
763 
764  //------------------------------------------------------------------------
765  // If we have some IP addresses left we try them
766  //------------------------------------------------------------------------
767  if( !pAddresses.empty() )
768  {
769  XRootDStatus st;
770  do
771  {
772  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
773  pAddresses.pop_back();
774  pConnectionInitTime = ::time( 0 );
775  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
776  }
777  while( !pAddresses.empty() && !st.IsOK() );
778 
779  if( !st.IsOK() )
780  OnFatalError( subStream, st, scopedLock );
781 
782  return;
783  }
784  //------------------------------------------------------------------------
785  // If we still can retry with the same host name, we sleep until the end
786  // of the connection window and try
787  //------------------------------------------------------------------------
788  else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
789  && !status.IsFatal() )
790  {
791  log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
792  pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
793 
794  Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
795  pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
796  return;
797  }
798  //--------------------------------------------------------------------------
799  // We are out of the connection window, the only thing we can do here
800  // is re-resolving the host name and retrying if we still can
801  //--------------------------------------------------------------------------
802  else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
803  {
804  pAddresses.clear();
805  pSubStreams[0]->status = Socket::Disconnected;
806  PathID path( 0, 0 );
807  XRootDStatus st = EnableLink( path );
808  if( !st.IsOK() )
809  OnFatalError( subStream, st, scopedLock );
810  return;
811  }
812 
813  //--------------------------------------------------------------------------
814  // Else, we fail
815  //--------------------------------------------------------------------------
816  OnFatalError( subStream, status, scopedLock );
817  }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void RegisterTask(Task *task, time_t time, bool own=true)

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), XrdCl::PostMasterMsg, and XrdCl::TaskManager::RegisterTask().

Referenced by ForceConnect(), XrdCl::AsyncSocketHandler::OnConnectionReturn(), and XrdCl::AsyncSocketHandler::OnFaultWhileHandshaking().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t  subStream,
XRootDStatus  status 
)

On error.

Definition at line 822 of file XrdClStream.cc.

823  {
824  XrdSysMutexHelper scopedLock( pMutex );
825  Log *log = DefaultEnv::GetLog();
826  pSubStreams[subStream]->socket->Close();
827  pSubStreams[subStream]->status = Socket::Disconnected;
828 
829  log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
830  pStreamName.c_str(), subStream, status.ToString().c_str() );
831 
832  //--------------------------------------------------------------------------
833  // Reinsert the stuff that we have failed to sent
834  //--------------------------------------------------------------------------
835  if( pSubStreams[subStream]->outMsgHelper.msg )
836  {
837  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
838  pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
839  h.stateful );
840  pIncomingQueue->RemoveMessageHandler(h.handler);
841  pSubStreams[subStream]->outMsgHelper.Reset();
842  }
843 
844  //--------------------------------------------------------------------------
845  // Reinsert the receiving handler and reset any partially read partial
846  //--------------------------------------------------------------------------
847  if( pSubStreams[subStream]->inMsgHelper.handler )
848  {
849  InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
850  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
851  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
852  if( xrdHandler ) xrdHandler->PartialReceived();
853  h.Reset();
854  }
855 
856  //--------------------------------------------------------------------------
857  // We are dealing with an error of a peripheral stream. If we don't have
858  // anything to send don't bother recovering. Otherwise move the requests
859  // to stream 0 if possible.
860  //--------------------------------------------------------------------------
861  if( subStream > 0 )
862  {
863  if( pSubStreams[subStream]->outQueue->IsEmpty() )
864  return;
865 
866  if( pSubStreams[0]->status != Socket::Disconnected )
867  {
868  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
869  if( pSubStreams[0]->status == Socket::Connected )
870  {
871  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
872  if( !st.IsOK() )
873  OnFatalError( 0, st, scopedLock );
874  return;
875  }
876  }
877  OnFatalError( subStream, status, scopedLock );
878  return;
879  }
880 
881  //--------------------------------------------------------------------------
882  // If we lost the stream 0 we have lost the session, we re-enable the
883  // stream if we still have things in one of the outgoing queues, otherwise
884  // there is not point to recover at this point.
885  //--------------------------------------------------------------------------
886  if( subStream == 0 )
887  {
888  MonitorDisconnection( status );
889 
890  SubStreamList::iterator it;
891  size_t outstanding = 0;
892  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
893  outstanding += (*it)->outQueue->GetSizeStateless();
894 
895  if( outstanding )
896  {
897  PathID path( 0, 0 );
898  XRootDStatus st = EnableLink( path );
899  if( !st.IsOK() )
900  {
901  OnFatalError( 0, st, scopedLock );
902  return;
903  }
904  }
905 
906  //------------------------------------------------------------------------
907  // We're done here, unlock the stream mutex to avoid deadlocks and
908  // report the disconnection event to the handlers
909  //------------------------------------------------------------------------
910  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
911  "message handlers.", pStreamName.c_str() );
912  OutQueue q;
913  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
914  q.GrabStateful( *(*it)->outQueue );
915  scopedLock.UnLock();
916 
917  q.Report( status );
918  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
919  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
920  return;
921  }
922  }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::Status::IsOK(), XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::InQueue::RemoveMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnFault(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t  subStream,
std::shared_ptr< Message msg,
uint32_t  bytesReceived 
)

Call back when a message has been reconstructed.

Definition at line 471 of file XrdClStream.cc.

474  {
475  msg->SetSessionId( pSessionId );
476  pBytesReceived += bytesReceived;
477 
478  MsgHandler *handler = nullptr;
479  uint16_t action = 0;
480  {
481  InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482  handler = mh.handler;
483  action = mh.action;
484  mh.Reset();
485  }
486 
487  if( !IsPartial( *msg ) )
488  {
489  uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490  *pChannelData );
491  if( streamAction & TransportHandler::DigestMsg )
492  return;
493 
494  if( streamAction & TransportHandler::RequestClose )
495  {
496  RequestClose( *msg );
497  return;
498  }
499  }
500 
501  Log *log = DefaultEnv::GetLog();
502 
503  //--------------------------------------------------------------------------
504  // No handler, we discard the message ...
505  //--------------------------------------------------------------------------
506  if( !handler )
507  {
508  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509  log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
510  "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511  pStreamName.c_str(), msg.get(), rsp->hdr.status,
512  rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513  return;
514  }
515 
516  //--------------------------------------------------------------------------
517  // We have a handler, so we call the callback
518  //--------------------------------------------------------------------------
519  log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
520  pStreamName.c_str(), msg.get() );
521 
523  {
524  log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
525  pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
526 
527  // if we are handling partial response we have to take down the timeout fence
528  if( IsPartial( *msg ) )
529  {
530  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531  if( xrdHandler ) xrdHandler->PartialReceived();
532  }
533 
534  return;
535  }
536 
537  Job *job = new HandleIncMsgJob( handler );
538  pJobManager->QueueJob( job );
539  }
kXR_char streamid[2]
Definition: XProtocol.hh:914
ServerResponseHeader hdr
Definition: XProtocol.hh:1288
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::TransportHandler::MessageReceived(), XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::JobManager::QueueJob(), XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t  subStream,
Message msg,
uint32_t  bytesSent 
)

Definition at line 601 of file XrdClStream.cc.

604  {
605  pTransport->MessageSent( msg, subStream, bytesSent,
606  *pChannelData );
607  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
608  pBytesSent += bytesSent;
609  if( h.handler )
610  {
611  // ensure expiration time is assigned if still in queue
612  pIncomingQueue->AssignTimeout( h.handler );
613  // OnStatusReady may cause the handler to delete itself, in
614  // which case the handler or the user callback may also delete msg
615  h.handler->OnStatusReady( msg, XRootDStatus() );
616  }
617  pSubStreams[subStream]->outMsgHelper.Reset();
618  }
void AssignTimeout(MsgHandler *handler)
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.

References XrdCl::InQueue::AssignTimeout(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::TransportHandler::MessageSent(), and XrdCl::MsgHandler::OnStatusReady().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t  subStream)

On read timeout.

Definition at line 1044 of file XrdClStream.cc.

1045  {
1046  //--------------------------------------------------------------------------
1047  // We only take the main stream into account
1048  //--------------------------------------------------------------------------
1049  if( substream != 0 )
1050  return true;
1051 
1052  //--------------------------------------------------------------------------
1053  // Check if there is no outgoing messages and if the stream TTL is elapesed.
1054  // It is assumed that the underlying transport makes sure that there is no
1055  // pending requests that are not answered, ie. all possible virtual streams
1056  // are de-allocated
1057  //--------------------------------------------------------------------------
1058  Log *log = DefaultEnv::GetLog();
1059  SubStreamList::iterator it;
1060  time_t now = time(0);
1061 
1062  XrdSysMutexHelper scopedLock( pMutex );
1063  uint32_t outgoingMessages = 0;
1064  time_t lastActivity = 0;
1065  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1066  {
1067  outgoingMessages += (*it)->outQueue->GetSize();
1068  time_t sockLastActivity = (*it)->socket->GetLastActivity();
1069  if( lastActivity < sockLastActivity )
1070  lastActivity = sockLastActivity;
1071  }
1072 
1073  if( !outgoingMessages )
1074  {
1075  bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1076  *pChannelData );
1077  if( disconnect )
1078  {
1079  log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1080  pStreamName.c_str() );
1081  scopedLock.UnLock();
1082  //----------------------------------------------------------------------
1083  // Important note!
1084  //
1085  // This destroys the Stream object itself, the underlined
1086  // AsyncSocketHandler object (that called this method) and the Channel
1087  // object that aggregates this Stream.
1088  //----------------------------------------------------------------------
1090  return false;
1091  }
1092  }
1093 
1094  //--------------------------------------------------------------------------
1095  // Check if the stream is broken
1096  //--------------------------------------------------------------------------
1097  XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1098  *pChannelData );
1099  if( !st.IsOK() )
1100  {
1101  scopedLock.UnLock();
1102  OnError( substream, st );
1103  return false;
1104  }
1105  return true;
1106  }
Status ForceDisconnect(const URL &url)
Shut down a channel.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
Definition: XrdClStream.cc:822
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0

References XrdCl::Log::Debug(), XrdCl::PostMaster::ForceDisconnect(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::IsStreamBroken(), XrdCl::TransportHandler::IsStreamTTLElapsed(), OnError(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t  subStream)

Definition at line 545 of file XrdClStream.cc.

546  {
547  XrdSysMutexHelper scopedLock( pMutex );
548  Log *log = DefaultEnv::GetLog();
549  if( pSubStreams[subStream]->outQueue->IsEmpty() )
550  {
551  log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552  pSubStreams[subStream]->socket->GetStreamName().c_str() );
553 
554  pSubStreams[subStream]->socket->DisableUplink();
555  return std::make_pair( (Message *)0, (MsgHandler *)0 );
556  }
557 
558  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559  h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560  h.expires,
561  h.stateful );
562 
563  log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
564  "from out-queue to in-queue, starting to send outgoing.",
565  pUrl->GetHostId().c_str(), h.handler,
566  h.msg->GetObfuscatedDescription().c_str() );
567 
568  scopedLock.UnLock();
569 
570  if( h.handler )
571  {
572  bool rmMsg = false;
573  pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
574  if( rmMsg )
575  {
576  Log *log = DefaultEnv::GetLog();
577  log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
578  pStreamName.c_str() );
579  }
580  h.handler->OnReadyToSend( h.msg );
581  }
582  return std::make_pair( h.msg, h.handler );
583  }
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)
Definition: XrdClInQueue.cc:54

References XrdCl::InQueue::AddMessageHandler(), XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, XrdSysMutexHelper::UnLock(), and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t  subStream)

On write timeout.

Definition at line 1111 of file XrdClStream.cc.

1112  {
1113  return true;
1114  }

Referenced by XrdCl::AsyncSocketHandler::OnWriteTimeout().

+ Here is the caller graph for this function:

◆ Query()

Status XrdCl::Stream::Query ( uint16_t  query,
AnyObject result 
)

Query the stream.

Definition at line 1233 of file XrdClStream.cc.

1234  {
1235  switch( query )
1236  {
1237  case StreamQuery::IpAddr:
1238  {
1239  result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1240  return Status();
1241  }
1242 
1243  case StreamQuery::IpStack:
1244  {
1245  result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1246  return Status();
1247  }
1248 
1249  case StreamQuery::HostName:
1250  {
1251  result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1252  return Status();
1253  }
1254 
1255  default:
1256  return Status( stError, errQueryNotSupported );
1257  }
1258  }
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

Referenced by XrdCl::Channel::QueryTransport().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler handler)

Register channel event handler.

Definition at line 1119 of file XrdClStream.cc.

1120  {
1121  pChannelEvHandlers.AddHandler( handler );
1122  }
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.

References XrdCl::ChannelHandlerList::AddHandler().

Referenced by XrdCl::Channel::RegisterEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler handler)

Remove a channel event handler.

Definition at line 1127 of file XrdClStream.cc.

1128  {
1129  pChannelEvHandlers.RemoveHandler( handler );
1130  }
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.

References XrdCl::ChannelHandlerList::RemoveHandler().

Referenced by XrdCl::Channel::RemoveEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message msg,
MsgHandler handler,
bool  stateful,
time_t  expires 
)

Queue the message for sending.

Definition at line 297 of file XrdClStream.cc.

301  {
302  XrdSysMutexHelper scopedLock( pMutex );
303  Log *log = DefaultEnv::GetLog();
304 
305  //--------------------------------------------------------------------------
306  // Check the session ID and bounce if needed
307  //--------------------------------------------------------------------------
308  if( msg->GetSessionId() &&
309  (pSubStreams[0]->status != Socket::Connected ||
310  pSessionId != msg->GetSessionId()) )
311  return XRootDStatus( stError, errInvalidSession );
312 
313  //--------------------------------------------------------------------------
314  // Decide on the path to send the message
315  //--------------------------------------------------------------------------
316  PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317  if( pSubStreams.size() <= path.up )
318  {
319  log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320  "substream %d, using 0 instead", pStreamName.c_str(),
321  msg->GetObfuscatedDescription().c_str(), path.up );
322  path.up = 0;
323  }
324 
325  log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
326  "substream %d expecting answer at %d", pStreamName.c_str(),
327  msg->GetObfuscatedDescription().c_str(), msg, path.up, path.down );
328 
329  //--------------------------------------------------------------------------
330  // Enable *a* path and insert the message to the right queue
331  //--------------------------------------------------------------------------
332  XRootDStatus st = EnableLink( path );
333  if( st.IsOK() )
334  {
335  pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336  pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337  expires, stateful );
338  }
339  else
340  st.status = stFatal;
341  return st;
342  }
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::MultiplexSubStream(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

Referenced by XrdCl::Channel::Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject channelData)
inline

Set the channel data.

Definition at line 115 of file XrdClStream.hh.

116  {
117  pChannelData = channelData;
118  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue incomingQueue)
inline

Set the incoming queue.

Definition at line 107 of file XrdClStream.hh.

108  {
109  pIncomingQueue = incomingQueue;
110  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager jobManager)
inline

Set job manager.

Definition at line 131 of file XrdClStream.hh.

132  {
133  pJobManager = jobManager;
134  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > &  onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 263 of file XrdClStream.hh.

264  {
265  XrdSysMutexHelper scopedLock( pMutex );
266  pOnDataConnJob = onConnJob;
267  }

Referenced by XrdCl::Channel::SetOnDataConnectHandler().

+ Here is the caller graph for this function:

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller poller)
inline

Set the poller.

Definition at line 99 of file XrdClStream.hh.

100  {
101  pPoller = poller;
102  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager taskManager)
inline

Set task manager.

Definition at line 123 of file XrdClStream.hh.

124  {
125  pTaskManager = taskManager;
126  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler transport)
inline

Set the transport.

Definition at line 91 of file XrdClStream.hh.

92  {
93  pTransport = transport;
94  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ Tick()

void XrdCl::Stream::Tick ( time_t  now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 377 of file XrdClStream.cc.

378  {
379  //--------------------------------------------------------------------------
380  // Check for timed-out requests and incoming handlers
381  //--------------------------------------------------------------------------
382  pMutex.Lock();
383  OutQueue q;
384  SubStreamList::iterator it;
385  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386  q.GrabExpired( *(*it)->outQueue, now );
387  pMutex.UnLock();
388 
389  q.Report( XRootDStatus( stError, errOperationExpired ) );
390  pIncomingQueue->ReportTimeout( now );
391  }
void ReportTimeout(time_t now=0)
Timeout handlers.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdSysMutex::Lock(), XrdCl::OutQueue::Report(), XrdCl::InQueue::ReportTimeout(), XrdCl::stError, and XrdSysMutex::UnLock().

Referenced by XrdCl::Channel::Tick().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: