XRootD
XrdClInQueue.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #include "XProtocol/XProtocol.hh"
20 #include "XrdCl/XrdClInQueue.hh"
22 #include "XrdCl/XrdClMessage.hh"
23 #include "XrdCl/XrdClLog.hh"
24 #include "XrdCl/XrdClDefaultEnv.hh"
25 #include "XrdCl/XrdClConstants.hh"
26 
27 #include <arpa/inet.h> // for network unmarshalling stuff
28 
29 namespace XrdCl
30 {
31  //----------------------------------------------------------------------------
32  // Filter messages
33  //----------------------------------------------------------------------------
34  bool InQueue::DiscardMessage( Message& msg, uint16_t& sid) const
35  {
36  if( msg.GetSize() < 8 )
37  return true;
38 
39  ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
40 
41  // We only care about async responses, but those are extracted now
42  // in the SocketHandler
43  if( rsp->hdr.status == kXR_attn )
44  return true;
45  else
46  sid = ((uint16_t)rsp->hdr.streamid[1] << 8) | (uint16_t)rsp->hdr.streamid[0];
47 
48  return false;
49  }
50 
51  //----------------------------------------------------------------------------
52  // Add a listener that should be notified about incoming messages
53  //----------------------------------------------------------------------------
54  void InQueue::AddMessageHandler( MsgHandler *handler, bool &rmMsg )
55  {
56  uint16_t handlerSid = handler->GetSid();
57  XrdSysMutexHelper scopedLock( pMutex );
58 
59  pHandlers[handlerSid] = HandlerAndExpire( handler, 0 );
60  }
61 
62  //----------------------------------------------------------------------------
63  // Get a message handler interested in receiving message whose header
64  // is stored in msg
65  //----------------------------------------------------------------------------
66  MsgHandler *InQueue::GetHandlerForMessage( std::shared_ptr<Message> &msg,
67  time_t &expires,
68  uint16_t &action )
69  {
70  time_t exp = 0;
71  uint16_t act = 0;
72  uint16_t msgSid = 0;
73  MsgHandler* handler = 0;
74 
75  if (DiscardMessage(*msg, msgSid))
76  {
77  return handler;
78  }
79 
80  XrdSysMutexHelper scopedLock( pMutex );
81  HandlerMap::iterator it = pHandlers.find(msgSid);
82 
83  if (it != pHandlers.end())
84  {
85  Log *log = DefaultEnv::GetLog();
86  handler = it->second.first;
87  act = handler->Examine( msg );
88  if( it->second.second == 0 ) {
89  it->second.second = handler->GetExpiration();
90  log->Debug( ExDbgMsg, "[handler: %p] Assigned expiration %lld.",
91  handler, (long long)it->second.second );
92  }
93  exp = it->second.second;
94  log->Debug( ExDbgMsg, "[msg: %p] Assigned MsgHandler: %p.",
95  msg.get(), handler );
96 
97 
98  if( act & MsgHandler::RemoveHandler )
99  {
100  pHandlers.erase( it );
101  log->Debug( ExDbgMsg, "[handler: %p] Removed MsgHandler: %p from the in-queue.",
102  handler, handler );
103  }
104  }
105 
106  if( handler )
107  {
108  expires = exp;
109  action = act;
110  }
111 
112  return handler;
113  }
114 
115  //----------------------------------------------------------------------------
116  // Re-insert the handler without scanning the cached messages
117  //----------------------------------------------------------------------------
119  time_t expires )
120  {
121  uint16_t handlerSid = handler->GetSid();
122  XrdSysMutexHelper scopedLock( pMutex );
123  pHandlers[handlerSid] = HandlerAndExpire( handler, expires );
124  }
125 
126  //----------------------------------------------------------------------------
127  // Remove a listener
128  //----------------------------------------------------------------------------
130  {
131  uint16_t handlerSid = handler->GetSid();
132  XrdSysMutexHelper scopedLock( pMutex );
133  pHandlers.erase(handlerSid);
134  Log *log = DefaultEnv::GetLog();
135  log->Debug( ExDbgMsg, "[handler: %p] Removed MsgHandler: %p from the in-queue.",
136  handler, handler );
137 
138  }
139 
140  //----------------------------------------------------------------------------
141  // Report an event to the handlers
142  //----------------------------------------------------------------------------
144  XRootDStatus status )
145  {
146  uint8_t action = 0;
147  XrdSysMutexHelper scopedLock( pMutex );
148  for( HandlerMap::iterator it = pHandlers.begin(); it != pHandlers.end(); )
149  {
150  action = it->second.first->OnStreamEvent( event, status );
151 
152  if( action & MsgHandler::RemoveHandler )
153  {
154  auto next = it; ++next;
155  pHandlers.erase( it );
156  it = next;
157  }
158  else
159  ++it;
160  }
161  }
162 
163  //----------------------------------------------------------------------------
164  // Timeout handlers
165  //----------------------------------------------------------------------------
166  void InQueue::ReportTimeout( time_t now )
167  {
168  if( !now )
169  now = ::time(0);
170 
171  XrdSysMutexHelper scopedLock( pMutex );
172  HandlerMap::iterator it = pHandlers.begin();
173  while( it != pHandlers.end() )
174  {
175  if( it->second.second && it->second.second <= now )
176  {
177  uint8_t act = it->second.first->OnStreamEvent( MsgHandler::Timeout,
179  auto next = it; ++next;
180  if( act & MsgHandler::RemoveHandler )
181  pHandlers.erase( it );
182  it = next;
183  }
184  else
185  ++it;
186  }
187  }
188 
189  //----------------------------------------------------------------------------
190  // Query the handler and extract the expiration time
191  //----------------------------------------------------------------------------
193  {
194  uint16_t handlerSid = handler->GetSid();
195  XrdSysMutexHelper scopedLock( pMutex );
196  HandlerMap::iterator it = pHandlers.find( handlerSid );
197  if( it != pHandlers.end() )
198  {
199  if( it->second.second == 0 )
200  {
201  it->second.second = handler->GetExpiration();
202 
203  Log *log = DefaultEnv::GetLog();
204  log->Debug( ExDbgMsg, "[handler: %p] Assigned expiration %lld.",
205  handler, (long long)it->second.second );
206 
207  }
208  }
209  }
210 
211 }
kXR_char streamid[2]
Definition: XProtocol.hh:914
@ kXR_attn
Definition: XProtocol.hh:901
ServerResponseHeader hdr
Definition: XProtocol.hh:1288
static Log * GetLog()
Get default log.
void ReportTimeout(time_t now=0)
Timeout handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66
void AssignTimeout(MsgHandler *handler)
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)
Definition: XrdClInQueue.cc:54
Handle diagnostics.
Definition: XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
virtual uint16_t Examine(std::shared_ptr< Message > &msg)=0
virtual uint16_t GetSid() const =0
virtual time_t GetExpiration()=0
StreamEvent
Events that may have occurred to the stream.
@ Timeout
The declared timeout has occurred.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t ExDbgMsg
Procedure execution status.
Definition: XrdClStatus.hh:115