XRootD
XrdCl::InQueue Class Reference

A synchronize queue for incoming data. More...

#include <XrdClInQueue.hh>

+ Collaboration diagram for XrdCl::InQueue:

Public Member Functions

void AddMessageHandler (MsgHandler *handler, bool &rmMsg)
 
void AssignTimeout (MsgHandler *handler)
 
MsgHandlerGetHandlerForMessage (std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
 
void ReAddMessageHandler (MsgHandler *handler, time_t expires)
 Re-insert the handler without scanning the cached messages. More...
 
void RemoveMessageHandler (MsgHandler *handler)
 Remove a listener. More...
 
void ReportStreamEvent (MsgHandler::StreamEvent event, XRootDStatus status)
 Report an event to the handlers. More...
 
void ReportTimeout (time_t now=0)
 Timeout handlers. More...
 

Detailed Description

A synchronize queue for incoming data.

Definition at line 36 of file XrdClInQueue.hh.

Member Function Documentation

◆ AddMessageHandler()

void XrdCl::InQueue::AddMessageHandler ( MsgHandler handler,
bool &  rmMsg 
)

Add a listener that should be notified about incoming messages. Freshly added handlers have no expire time set and will not trigger the timeout reporting. The expiry is added by AssignTimeout or GetHandlerForMessage.

Parameters
handlermessage handler
rmMsgwill be set to true if a left over message matching the request has been removed from the queue

Definition at line 54 of file XrdClInQueue.cc.

55  {
56  uint16_t handlerSid = handler->GetSid();
57  XrdSysMutexHelper scopedLock( pMutex );
58 
59  pHandlers[handlerSid] = HandlerAndExpire( handler, 0 );
60  }

References XrdCl::MsgHandler::GetSid().

Referenced by XrdCl::Stream::OnReadyToWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ AssignTimeout()

void XrdCl::InQueue::AssignTimeout ( MsgHandler handler)

If the specified handler is in the queue but has not yet had an expiry time assigned, query the handler for the expiry and record it. Expiry will also be assigned by GetHandlerForMessage if not already assigned.

Parameters
handlerhandler to check

Definition at line 192 of file XrdClInQueue.cc.

193  {
194  uint16_t handlerSid = handler->GetSid();
195  XrdSysMutexHelper scopedLock( pMutex );
196  HandlerMap::iterator it = pHandlers.find( handlerSid );
197  if( it != pHandlers.end() )
198  {
199  if( it->second.second == 0 )
200  {
201  it->second.second = handler->GetExpiration();
202 
203  Log *log = DefaultEnv::GetLog();
204  log->Debug( ExDbgMsg, "[handler: %p] Assigned expiration %lld.",
205  handler, (long long)it->second.second );
206 
207  }
208  }
209  }
static Log * GetLog()
Get default log.
const uint64_t ExDbgMsg
XrdSysError Log
Definition: XrdConfig.cc:113

References XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::MsgHandler::GetExpiration(), XrdCl::DefaultEnv::GetLog(), and XrdCl::MsgHandler::GetSid().

Referenced by XrdCl::Stream::OnMessageSent().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetHandlerForMessage()

MsgHandler * XrdCl::InQueue::GetHandlerForMessage ( std::shared_ptr< Message > &  msg,
time_t &  expires,
uint16_t &  action 
)

Get a message handler interested in receiving message whose header is stored in msg

Parameters
msgmessage header
expireshandle's expiration timestamp
actionthe action declared by the handler
Returns
handler or 0 if none is interested

Definition at line 66 of file XrdClInQueue.cc.

69  {
70  time_t exp = 0;
71  uint16_t act = 0;
72  uint16_t msgSid = 0;
73  MsgHandler* handler = 0;
74 
75  if (DiscardMessage(*msg, msgSid))
76  {
77  return handler;
78  }
79 
80  XrdSysMutexHelper scopedLock( pMutex );
81  HandlerMap::iterator it = pHandlers.find(msgSid);
82 
83  if (it != pHandlers.end())
84  {
85  Log *log = DefaultEnv::GetLog();
86  handler = it->second.first;
87  act = handler->Examine( msg );
88  if( it->second.second == 0 ) {
89  it->second.second = handler->GetExpiration();
90  log->Debug( ExDbgMsg, "[handler: %p] Assigned expiration %lld.",
91  handler, (long long)it->second.second );
92  }
93  exp = it->second.second;
94  log->Debug( ExDbgMsg, "[msg: %p] Assigned MsgHandler: %p.",
95  msg.get(), handler );
96 
97 
98  if( act & MsgHandler::RemoveHandler )
99  {
100  pHandlers.erase( it );
101  log->Debug( ExDbgMsg, "[handler: %p] Removed MsgHandler: %p from the in-queue.",
102  handler, handler );
103  }
104  }
105 
106  if( handler )
107  {
108  expires = exp;
109  action = act;
110  }
111 
112  return handler;
113  }

References XrdCl::Log::Debug(), XrdCl::MsgHandler::Examine(), XrdCl::ExDbgMsg, XrdCl::MsgHandler::GetExpiration(), XrdCl::DefaultEnv::GetLog(), and XrdCl::MsgHandler::RemoveHandler.

Referenced by XrdCl::Stream::InstallIncHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReAddMessageHandler()

void XrdCl::InQueue::ReAddMessageHandler ( MsgHandler handler,
time_t  expires 
)

Re-insert the handler without scanning the cached messages.

Definition at line 118 of file XrdClInQueue.cc.

120  {
121  uint16_t handlerSid = handler->GetSid();
122  XrdSysMutexHelper scopedLock( pMutex );
123  pHandlers[handlerSid] = HandlerAndExpire( handler, expires );
124  }

References XrdCl::MsgHandler::GetSid().

Referenced by XrdCl::Stream::ForceError(), and XrdCl::Stream::OnError().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RemoveMessageHandler()

void XrdCl::InQueue::RemoveMessageHandler ( MsgHandler handler)

Remove a listener.

Definition at line 129 of file XrdClInQueue.cc.

130  {
131  uint16_t handlerSid = handler->GetSid();
132  XrdSysMutexHelper scopedLock( pMutex );
133  pHandlers.erase(handlerSid);
134  Log *log = DefaultEnv::GetLog();
135  log->Debug( ExDbgMsg, "[handler: %p] Removed MsgHandler: %p from the in-queue.",
136  handler, handler );
137 
138  }

References XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::DefaultEnv::GetLog(), and XrdCl::MsgHandler::GetSid().

Referenced by XrdCl::Stream::ForceError(), XrdCl::Stream::InspectStatusRsp(), and XrdCl::Stream::OnError().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReportStreamEvent()

void XrdCl::InQueue::ReportStreamEvent ( MsgHandler::StreamEvent  event,
XRootDStatus  status 
)

Report an event to the handlers.

Definition at line 143 of file XrdClInQueue.cc.

145  {
146  uint8_t action = 0;
147  XrdSysMutexHelper scopedLock( pMutex );
148  for( HandlerMap::iterator it = pHandlers.begin(); it != pHandlers.end(); )
149  {
150  action = it->second.first->OnStreamEvent( event, status );
151 
152  if( action & MsgHandler::RemoveHandler )
153  {
154  auto next = it; ++next;
155  pHandlers.erase( it );
156  it = next;
157  }
158  else
159  ++it;
160  }
161  }

References XrdCl::MsgHandler::RemoveHandler.

Referenced by XrdCl::Stream::ForceError(), and XrdCl::Stream::OnError().

+ Here is the caller graph for this function:

◆ ReportTimeout()

void XrdCl::InQueue::ReportTimeout ( time_t  now = 0)

Timeout handlers.

Definition at line 166 of file XrdClInQueue.cc.

167  {
168  if( !now )
169  now = ::time(0);
170 
171  XrdSysMutexHelper scopedLock( pMutex );
172  HandlerMap::iterator it = pHandlers.begin();
173  while( it != pHandlers.end() )
174  {
175  if( it->second.second && it->second.second <= now )
176  {
177  uint8_t act = it->second.first->OnStreamEvent( MsgHandler::Timeout,
178  Status( stError, errOperationExpired ) );
179  auto next = it; ++next;
180  if( act & MsgHandler::RemoveHandler )
181  pHandlers.erase( it );
182  it = next;
183  }
184  else
185  ++it;
186  }
187  }
@ Timeout
The declared timeout has occurred.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32

References XrdCl::errOperationExpired, XrdCl::MsgHandler::RemoveHandler, XrdCl::stError, and XrdCl::MsgHandler::Timeout.

Referenced by XrdCl::Stream::Tick().

+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: