XRootD
XrdClFileStateHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClStatus.hh"
29 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClConstants.hh"
35 #include "XrdCl/XrdClMonitor.hh"
36 #include "XrdCl/XrdClFileTimer.hh"
38 #include "XrdCl/XrdClJobManager.hh"
40 #include "XrdCl/XrdClAnyObject.hh"
41 #include "XrdCl/XrdClUtils.hh"
42 
43 #ifdef WITH_XRDEC
44 #include "XrdCl/XrdClEcHandler.hh"
45 #endif
46 
47 #include "XrdOuc/XrdOucCRC.hh"
49 
51 #include "XrdSys/XrdSysPageSize.hh"
52 #include "XrdSys/XrdSysPthread.hh"
53 
54 #include <sstream>
55 #include <memory>
56 #include <numeric>
57 #include <sys/time.h>
58 #include <uuid/uuid.h>
59 #include <mutex>
60 
61 namespace
62 {
63  //----------------------------------------------------------------------------
64  // Helper callback for handling PgRead responses
65  //----------------------------------------------------------------------------
66  class PgReadHandler : public XrdCl::ResponseHandler
67  {
68  friend class PgReadRetryHandler;
69 
70  public:
71 
72  //------------------------------------------------------------------------
73  // Constructor
74  //------------------------------------------------------------------------
75  PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
76  XrdCl::ResponseHandler *userHandler,
77  uint64_t orgOffset ) :
78  stateHandler( stateHandler ),
79  userHandler( userHandler ),
80  orgOffset( orgOffset ),
81  maincall( true ),
82  retrycnt( 0 ),
83  nbrepair( 0 )
84  {
85  }
86 
87  //------------------------------------------------------------------------
88  // Handle the response
89  //------------------------------------------------------------------------
91  XrdCl::AnyObject *response,
92  XrdCl::HostList *hostList )
93  {
94  using namespace XrdCl;
95 
96  std::unique_lock<std::mutex> lck( mtx );
97 
98  if( !maincall )
99  {
100  //--------------------------------------------------------------------
101  // We are serving PgRead retry request
102  //--------------------------------------------------------------------
103  --retrycnt;
104  if( !status->IsOK() )
105  st.reset( status );
106  else
107  {
108  delete status; // by convention other args are null (see PgReadRetryHandler)
109  ++nbrepair; // update number of repaired pages
110  }
111 
112  if( retrycnt == 0 )
113  {
114  //------------------------------------------------------------------
115  // All retries came back
116  //------------------------------------------------------------------
117  if( st->IsOK() )
118  {
119  PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
120  pginf.SetNbRepair( nbrepair );
121  userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
122  }
123  else
124  userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
125  lck.unlock();
126  delete this;
127  }
128 
129  return;
130  }
131 
132  //----------------------------------------------------------------------
133  // We are serving main PgRead request
134  //----------------------------------------------------------------------
135  if( !status->IsOK() )
136  {
137  //--------------------------------------------------------------------
138  // The main PgRead request has failed
139  //--------------------------------------------------------------------
140  userHandler->HandleResponseWithHosts( status, response, hostList );
141  lck.unlock();
142  delete this;
143  return;
144  }
145 
146  maincall = false;
147 
148  //----------------------------------------------------------------------
149  // Do the integrity check
150  //----------------------------------------------------------------------
151  PageInfo *pginf = 0;
152  response->Get( pginf );
153 
154  uint64_t pgoff = pginf->GetOffset();
155  uint32_t bytesRead = pginf->GetLength();
156  std::vector<uint32_t> &cksums = pginf->GetCksums();
157  char *buffer = reinterpret_cast<char*>( pginf->GetBuffer() );
158  size_t nbpages = XrdOucPgrwUtils::csNum( pgoff, bytesRead );
159  uint32_t pgsize = XrdSys::PageSize - pgoff % XrdSys::PageSize;
160  if( pgsize > bytesRead ) pgsize = bytesRead;
161 
162  for( size_t pgnb = 0; pgnb < nbpages; ++pgnb )
163  {
164  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
165  if( crcval != cksums[pgnb] )
166  {
167  Log *log = DefaultEnv::GetLog();
168  log->Info( FileMsg, "[0x%x@%s] Received corrupted page, will retry page #%d.",
169  this, stateHandler->pFileUrl->GetURL().c_str(), pgnb );
170 
171  XRootDStatus st = XrdCl::FileStateHandler::PgReadRetry( stateHandler, pgoff, pgsize, pgnb, buffer, this, 0 );
172  if( !st.IsOK())
173  {
174  *status = st; // the reason for this failure
175  break;
176  }
177  ++retrycnt; // update the retry counter
178  }
179 
180  bytesRead -= pgsize;
181  buffer += pgsize;
182  pgoff += pgsize;
183  pgsize = XrdSys::PageSize;
184  if( pgsize > bytesRead ) pgsize = bytesRead;
185  }
186 
187 
188  if( retrycnt == 0 )
189  {
190  //--------------------------------------------------------------------
191  // All went well!
192  //--------------------------------------------------------------------
193  userHandler->HandleResponseWithHosts( status, response, hostList );
194  lck.unlock();
195  delete this;
196  return;
197  }
198 
199  //----------------------------------------------------------------------
200  // We have to wait for retries!
201  //----------------------------------------------------------------------
202  resp.reset( response );
203  hosts.reset( hostList );
204  st.reset( status );
205  }
206 
207  void UpdateCksum( size_t pgnb, uint32_t crcval )
208  {
209  if( resp )
210  {
211  XrdCl::PageInfo *pginf = 0;
212  resp->Get( pginf );
213  pginf->GetCksums()[pgnb] = crcval;
214  }
215  }
216 
217  private:
218 
219  std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
220  XrdCl::ResponseHandler *userHandler;
221  uint64_t orgOffset;
222 
223  std::unique_ptr<XrdCl::AnyObject> resp;
224  std::unique_ptr<XrdCl::HostList> hosts;
225  std::unique_ptr<XrdCl::XRootDStatus> st;
226 
227  std::mutex mtx;
228  bool maincall;
229  size_t retrycnt;
230  size_t nbrepair;
231 
232  };
233 
234  //----------------------------------------------------------------------------
235  // Helper callback for handling PgRead retries
236  //----------------------------------------------------------------------------
237  class PgReadRetryHandler : public XrdCl::ResponseHandler
238  {
239  public:
240 
241  PgReadRetryHandler( PgReadHandler *pgReadHandler, size_t pgnb ) : pgReadHandler( pgReadHandler ),
242  pgnb( pgnb )
243  {
244 
245  }
246 
247  //------------------------------------------------------------------------
248  // Handle the response
249  //------------------------------------------------------------------------
250  void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
251  XrdCl::AnyObject *response,
252  XrdCl::HostList *hostList )
253  {
254  using namespace XrdCl;
255 
256  if( !status->IsOK() )
257  {
258  Log *log = DefaultEnv::GetLog();
259  log->Info( FileMsg, "[0x%x@%s] Failed to recover page #%d.",
260  this, pgReadHandler->stateHandler->pFileUrl->GetURL().c_str(), pgnb );
261  pgReadHandler->HandleResponseWithHosts( status, response, hostList );
262  delete this;
263  return;
264  }
265 
266  XrdCl::PageInfo *pginf = 0;
267  response->Get( pginf );
268  if( pginf->GetLength() > (uint32_t)XrdSys::PageSize || pginf->GetCksums().size() != 1 )
269  {
270  Log *log = DefaultEnv::GetLog();
271  log->Info( FileMsg, "[0x%x@%s] Failed to recover page #%d.",
272  this, pgReadHandler->stateHandler->pFileUrl->GetURL().c_str(), pgnb );
273  // we retry a page at a time so the length cannot exceed 4KB
274  DeleteArgs( status, response, hostList );
275  pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
276  delete this;
277  return;
278  }
279 
280  uint32_t crcval = XrdOucCRC::Calc32C( pginf->GetBuffer(), pginf->GetLength() );
281  if( crcval != pginf->GetCksums().front() )
282  {
283  Log *log = DefaultEnv::GetLog();
284  log->Info( FileMsg, "[0x%x@%s] Failed to recover page #%d.",
285  this, pgReadHandler->stateHandler->pFileUrl->GetURL().c_str(), pgnb );
286  DeleteArgs( status, response, hostList );
287  pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
288  delete this;
289  return;
290  }
291 
292  Log *log = DefaultEnv::GetLog();
293  log->Info( FileMsg, "[0x%x@%s] Successfully recovered page #%d.",
294  this, pgReadHandler->stateHandler->pFileUrl->GetURL().c_str(), pgnb );
295 
296  DeleteArgs( 0, response, hostList );
297  pgReadHandler->UpdateCksum( pgnb, crcval );
298  pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
299  delete this;
300  }
301 
302  private:
303 
304  inline void DeleteArgs( XrdCl::XRootDStatus *status,
305  XrdCl::AnyObject *response,
306  XrdCl::HostList *hostList )
307  {
308  delete status;
309  delete response;
310  delete hostList;
311  }
312 
313  PgReadHandler *pgReadHandler;
314  size_t pgnb;
315  };
316 
317  //----------------------------------------------------------------------------
318  // Handle PgRead substitution with ordinary Read
319  //----------------------------------------------------------------------------
321  {
322  public:
323 
324  //------------------------------------------------------------------------
325  // Constructor
326  //------------------------------------------------------------------------
327  PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
328  XrdCl::ResponseHandler *userHandler ) :
329  stateHandler( stateHandler ),
330  userHandler( userHandler )
331  {
332  }
333 
334  //------------------------------------------------------------------------
335  // Handle the response
336  //------------------------------------------------------------------------
337  void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
338  XrdCl::AnyObject *rdresp,
339  XrdCl::HostList *hostList )
340  {
341  if( !status->IsOK() )
342  {
343  userHandler->HandleResponseWithHosts( status, rdresp, hostList );
344  delete this;
345  return;
346  }
347 
348  using namespace XrdCl;
349 
350  ChunkInfo *chunk = 0;
351  rdresp->Get( chunk );
352 
353  std::vector<uint32_t> cksums;
354  if( stateHandler->pIsChannelEncrypted )
355  {
356  size_t nbpages = chunk->length / XrdSys::PageSize;
357  if( chunk->length % XrdSys::PageSize )
358  ++nbpages;
359  cksums.reserve( nbpages );
360 
361  size_t size = chunk->length;
362  char *buffer = reinterpret_cast<char*>( chunk->buffer );
363 
364  for( size_t pg = 0; pg < nbpages; ++pg )
365  {
366  size_t pgsize = XrdSys::PageSize;
367  if( pgsize > size ) pgsize = size;
368  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
369  cksums.push_back( crcval );
370  buffer += pgsize;
371  size -= pgsize;
372  }
373  }
374 
375  PageInfo *pages = new PageInfo( chunk->offset, chunk->length,
376  chunk->buffer, std::move( cksums ) );
377  delete rdresp;
378  AnyObject *response = new AnyObject();
379  response->Set( pages );
380  userHandler->HandleResponseWithHosts( status, response, hostList );
381 
382  delete this;
383  }
384 
385  private:
386 
387  std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
388  XrdCl::ResponseHandler *userHandler;
389  };
390 
391  //----------------------------------------------------------------------------
392  // Object that does things to the FileStateHandler when kXR_open returns
393  // and then calls the user handler
394  //----------------------------------------------------------------------------
395  class OpenHandler: public XrdCl::ResponseHandler
396  {
397  public:
398  //------------------------------------------------------------------------
399  // Constructor
400  //------------------------------------------------------------------------
401  OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
402  XrdCl::ResponseHandler *userHandler ):
403  pStateHandler( stateHandler ),
404  pUserHandler( userHandler )
405  {
406  }
407 
408  //------------------------------------------------------------------------
409  // Handle the response
410  //------------------------------------------------------------------------
411  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
412  XrdCl::AnyObject *response,
413  XrdCl::HostList *hostList )
414  {
415  using namespace XrdCl;
416 
417  //----------------------------------------------------------------------
418  // Extract the statistics info
419  //----------------------------------------------------------------------
420  OpenInfo *openInfo = 0;
421  if( status->IsOK() )
422  response->Get( openInfo );
423 #ifdef WITH_XRDEC
424  else
425  //--------------------------------------------------------------------
426  // Handle EC redirect
427  //--------------------------------------------------------------------
428  if( status->code == errRedirect )
429  {
430  std::string ecurl = status->GetErrorMessage();
431  EcHandler *ecHandler = GetEcHandler( hostList->front().url, ecurl );
432  if( ecHandler )
433  {
434  pStateHandler->pPlugin = ecHandler; // set the plugin for the File object
435  ecHandler->Open( pStateHandler->pOpenFlags, pUserHandler, 0/*TODO figure out right value for the timeout*/ );
436  return;
437  }
438  }
439 #endif
440  //----------------------------------------------------------------------
441  // Notify the state handler and the client and say bye bye
442  //----------------------------------------------------------------------
443  pStateHandler->OnOpen( status, openInfo, hostList );
444  delete response;
445  if( pUserHandler )
446  pUserHandler->HandleResponseWithHosts( status, 0, hostList );
447  else
448  {
449  delete status;
450  delete hostList;
451  }
452  delete this;
453  }
454 
455  private:
456  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
457  XrdCl::ResponseHandler *pUserHandler;
458  };
459 
460  //----------------------------------------------------------------------------
461  // Object that does things to the FileStateHandler when kXR_close returns
462  // and then calls the user handler
463  //----------------------------------------------------------------------------
464  class CloseHandler: public XrdCl::ResponseHandler
465  {
466  public:
467  //------------------------------------------------------------------------
468  // Constructor
469  //------------------------------------------------------------------------
470  CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
471  XrdCl::ResponseHandler *userHandler,
472  XrdCl::Message *message ):
473  pStateHandler( stateHandler ),
474  pUserHandler( userHandler ),
475  pMessage( message )
476  {
477  }
478 
479  //------------------------------------------------------------------------
481  //------------------------------------------------------------------------
482  virtual ~CloseHandler()
483  {
484  delete pMessage;
485  }
486 
487  //------------------------------------------------------------------------
488  // Handle the response
489  //------------------------------------------------------------------------
490  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
491  XrdCl::AnyObject *response,
492  XrdCl::HostList *hostList )
493  {
494  pStateHandler->OnClose( status );
495  if( pUserHandler )
496  pUserHandler->HandleResponseWithHosts( status, response, hostList );
497  else
498  {
499  delete response;
500  delete status;
501  delete hostList;
502  }
503 
504  delete this;
505  }
506 
507  private:
508  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
509  XrdCl::ResponseHandler *pUserHandler;
510  XrdCl::Message *pMessage;
511  };
512 
513  //----------------------------------------------------------------------------
514  // Stateful message handler
515  //----------------------------------------------------------------------------
516  class StatefulHandler: public XrdCl::ResponseHandler
517  {
518  public:
519  //------------------------------------------------------------------------
520  // Constructor
521  //------------------------------------------------------------------------
522  StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
523  XrdCl::ResponseHandler *userHandler,
524  XrdCl::Message *message,
525  const XrdCl::MessageSendParams &sendParams ):
526  pStateHandler( stateHandler ),
527  pUserHandler( userHandler ),
528  pMessage( message ),
529  pSendParams( sendParams )
530  {
531  }
532 
533  //------------------------------------------------------------------------
534  // Destructor
535  //------------------------------------------------------------------------
536  virtual ~StatefulHandler()
537  {
538  delete pMessage;
539  delete pSendParams.chunkList;
540  delete pSendParams.kbuff;
541  }
542 
543  //------------------------------------------------------------------------
544  // Handle the response
545  //------------------------------------------------------------------------
546  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
547  XrdCl::AnyObject *response,
548  XrdCl::HostList *hostList )
549  {
550  using namespace XrdCl;
551  std::unique_ptr<AnyObject> responsePtr( response );
552  pSendParams.hostList = hostList;
553 
554  //----------------------------------------------------------------------
555  // Houston we have a problem...
556  //----------------------------------------------------------------------
557  if( !status->IsOK() )
558  {
559  XrdCl::FileStateHandler::OnStateError( pStateHandler, status, pMessage, this, pSendParams );
560  return;
561  }
562 
563  //----------------------------------------------------------------------
564  // We're clear
565  //----------------------------------------------------------------------
566  responsePtr.release();
567  XrdCl::FileStateHandler::OnStateResponse( pStateHandler, status, pMessage, response, hostList );
568  if( pUserHandler )
569  pUserHandler->HandleResponseWithHosts( status, response, hostList );
570  else
571  {
572  delete status,
573  delete response;
574  delete hostList;
575  }
576  delete this;
577  }
578 
579  //------------------------------------------------------------------------
581  //------------------------------------------------------------------------
582  XrdCl::ResponseHandler *GetUserHandler()
583  {
584  return pUserHandler;
585  }
586 
587  private:
588  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
589  XrdCl::ResponseHandler *pUserHandler;
590  XrdCl::Message *pMessage;
591  XrdCl::MessageSendParams pSendParams;
592  };
593 
594  //----------------------------------------------------------------------------
595  // Release-buffer Handler
596  //----------------------------------------------------------------------------
597  class ReleaseBufferHandler: public XrdCl::ResponseHandler
598  {
599  public:
600 
601  //------------------------------------------------------------------------
602  // Constructor
603  //------------------------------------------------------------------------
604  ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
605  buffer( std::move( buffer ) ),
606  handler( handler )
607  {
608  }
609 
610  //------------------------------------------------------------------------
611  // Handle the response
612  //------------------------------------------------------------------------
613  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
614  XrdCl::AnyObject *response,
615  XrdCl::HostList *hostList )
616  {
617  if (handler)
618  handler->HandleResponseWithHosts( status, response, hostList );
619  }
620 
621  //------------------------------------------------------------------------
622  // Get the underlying buffer
623  //------------------------------------------------------------------------
624  XrdCl::Buffer& GetBuffer()
625  {
626  return buffer;
627  }
628 
629  private:
630  XrdCl::Buffer buffer;
631  XrdCl::ResponseHandler *handler;
632  };
633 }
634 
635 namespace XrdCl
636 {
637  //----------------------------------------------------------------------------
638  // Constructor
639  //----------------------------------------------------------------------------
640  FileStateHandler::FileStateHandler( FilePlugIn *& plugin ):
641  pFileState( Closed ),
642  pStatInfo( 0 ),
643  pFileUrl( 0 ),
644  pDataServer( 0 ),
645  pLoadBalancer( 0 ),
646  pStateRedirect( 0 ),
647  pWrtRecoveryRedir( 0 ),
648  pFileHandle( 0 ),
649  pOpenMode( 0 ),
650  pOpenFlags( 0 ),
651  pSessionId( 0 ),
652  pDoRecoverRead( true ),
653  pDoRecoverWrite( true ),
654  pFollowRedirects( true ),
655  pUseVirtRedirector( true ),
656  pIsChannelEncrypted( false ),
657  pAllowBundledClose( false ),
658  pPlugin( plugin )
659  {
660  pFileHandle = new uint8_t[4];
661  ResetMonitoringVars();
664  pLFileHandler = new LocalFileHandler();
665  }
666 
667  //------------------------------------------------------------------------
672  //------------------------------------------------------------------------
673  FileStateHandler::FileStateHandler( bool useVirtRedirector, FilePlugIn *& plugin ):
674  pFileState( Closed ),
675  pStatInfo( 0 ),
676  pFileUrl( 0 ),
677  pDataServer( 0 ),
678  pLoadBalancer( 0 ),
679  pStateRedirect( 0 ),
680  pWrtRecoveryRedir( 0 ),
681  pFileHandle( 0 ),
682  pOpenMode( 0 ),
683  pOpenFlags( 0 ),
684  pSessionId( 0 ),
685  pDoRecoverRead( true ),
686  pDoRecoverWrite( true ),
687  pFollowRedirects( true ),
688  pUseVirtRedirector( useVirtRedirector ),
689  pAllowBundledClose( false ),
690  pPlugin( plugin )
691  {
692  pFileHandle = new uint8_t[4];
693  ResetMonitoringVars();
696  pLFileHandler = new LocalFileHandler();
697  }
698 
699  //----------------------------------------------------------------------------
700  // Destructor
701  //----------------------------------------------------------------------------
703  {
704  //--------------------------------------------------------------------------
705  // This, in principle, should never ever happen. Except for the case
706  // when we're interfaced with ROOT that may call this desctructor from
707  // its garbage collector, from its __cxa_finalize, ie. after the XrdCl lib
708  // has been finalized by the linker. So, if we don't have the log object
709  // at this point we just give up the hope.
710  //--------------------------------------------------------------------------
711  if( DefaultEnv::GetLog() && pSessionId && !pDataServer->IsLocalFile() ) // if the file object was bound to a physical connection
712  DefaultEnv::GetPostMaster()->DecFileInstCnt( *pDataServer );
713 
716 
719 
720  if( pFileState != Closed && DefaultEnv::GetLog() )
721  {
722  XRootDStatus st;
723  MonitorClose( &st );
724  ResetMonitoringVars();
725  }
726 
727  // check if the logger is still there, this is only for root, as root might
728  // have unload us already so in this case we don't want to do anything
729  if( DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
730  {
732  registry.Release( *pFileUrl );
733  }
734 
735  delete pStatInfo;
736  delete pFileUrl;
737  delete pDataServer;
738  delete pLoadBalancer;
739  delete [] pFileHandle;
740  delete pLFileHandler;
741  }
742 
743  //----------------------------------------------------------------------------
744  // Open the file pointed to by the given URL
745  //----------------------------------------------------------------------------
746  XRootDStatus FileStateHandler::Open( std::shared_ptr<FileStateHandler> &self,
747  const std::string &url,
748  uint16_t flags,
749  uint16_t mode,
750  ResponseHandler *handler,
751  uint16_t timeout )
752  {
753  XrdSysMutexHelper scopedLock( self->pMutex );
754 
755  //--------------------------------------------------------------------------
756  // Check if we can proceed
757  //--------------------------------------------------------------------------
758  if( self->pFileState == Error )
759  return self->pStatus;
760 
761  if( self->pFileState == OpenInProgress )
763 
764  if( self->pFileState == CloseInProgress || self->pFileState == Opened ||
765  self->pFileState == Recovering )
766  return XRootDStatus( stError, errInvalidOp );
767 
768  self->pFileState = OpenInProgress;
769 
770  //--------------------------------------------------------------------------
771  // Check if the parameters are valid
772  //--------------------------------------------------------------------------
773  Log *log = DefaultEnv::GetLog();
774 
775  if( self->pFileUrl )
776  {
777  if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
778  {
780  registry.Release( *self->pFileUrl );
781  }
782  delete self->pFileUrl;
783  self->pFileUrl = 0;
784  }
785 
786  self->pFileUrl = new URL( url );
787 
788  //--------------------------------------------------------------------------
789  // Add unique uuid to each open request so replays due to error/timeout
790  // recovery can be correctly handled.
791  //--------------------------------------------------------------------------
792  URL::ParamsMap cgi = self->pFileUrl->GetParams();
793  uuid_t uuid;
794  char requuid[37]= {0};
795  uuid_generate( uuid );
796  uuid_unparse( uuid, requuid );
797  cgi["xrdcl.requuid"] = requuid;
798  self->pFileUrl->SetParams( cgi );
799 
800  if( !self->pFileUrl->IsValid() )
801  {
802  log->Error( FileMsg, "[0x%x@%s] Trying to open invalid url: %s",
803  self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
804  self->pStatus = XRootDStatus( stError, errInvalidArgs );
805  self->pFileState = Closed;
806  return self->pStatus;
807  }
808 
809  //--------------------------------------------------------------------------
810  // Check if the recovery procedures should be enabled
811  //--------------------------------------------------------------------------
812  const URL::ParamsMap &urlParams = self->pFileUrl->GetParams();
813  URL::ParamsMap::const_iterator it;
814  it = urlParams.find( "xrdcl.recover-reads" );
815  if( (it != urlParams.end() && it->second == "false") ||
816  !self->pDoRecoverRead )
817  {
818  self->pDoRecoverRead = false;
819  log->Debug( FileMsg, "[0x%x@%s] Read recovery procedures are disabled",
820  self.get(), self->pFileUrl->GetURL().c_str() );
821  }
822 
823  it = urlParams.find( "xrdcl.recover-writes" );
824  if( (it != urlParams.end() && it->second == "false") ||
825  !self->pDoRecoverWrite )
826  {
827  self->pDoRecoverWrite = false;
828  log->Debug( FileMsg, "[0x%x@%s] Write recovery procedures are disabled",
829  self.get(), self->pFileUrl->GetURL().c_str() );
830  }
831 
832  //--------------------------------------------------------------------------
833  // Open the file
834  //--------------------------------------------------------------------------
835  log->Debug( FileMsg, "[0x%x@%s] Sending an open command", self.get(),
836  self->pFileUrl->GetURL().c_str() );
837 
838  self->pOpenMode = mode;
839  self->pOpenFlags = flags;
840  OpenHandler *openHandler = new OpenHandler( self, handler );
841 
842  Message *msg;
843  ClientOpenRequest *req;
844  std::string path = self->pFileUrl->GetPathWithFilteredParams();
845  MessageUtils::CreateRequest( msg, req, path.length() );
846 
847  req->requestid = kXR_open;
848  req->mode = mode;
849  req->options = flags | kXR_async | kXR_retstat;
850  req->dlen = path.length();
851  msg->Append( path.c_str(), path.length(), 24 );
852 
854  MessageSendParams params; params.timeout = timeout;
855  params.followRedirects = self->pFollowRedirects;
857 
858  XRootDStatus st = self->IssueRequest( *self->pFileUrl, msg, openHandler, params );
859 
860  if( !st.IsOK() )
861  {
862  delete openHandler;
863  self->pStatus = st;
864  self->pFileState = Closed;
865  return st;
866  }
867  return st;
868  }
869 
870  //----------------------------------------------------------------------------
871  // Close the file object
872  //----------------------------------------------------------------------------
873  XRootDStatus FileStateHandler::Close( std::shared_ptr<FileStateHandler> &self,
874  ResponseHandler *handler,
875  uint16_t timeout )
876  {
877  XrdSysMutexHelper scopedLock( self->pMutex );
878 
879  //--------------------------------------------------------------------------
880  // Check if we can proceed
881  //--------------------------------------------------------------------------
882  if( self->pFileState == Error )
883  return self->pStatus;
884 
885  if( self->pFileState == CloseInProgress )
887 
888  if( self->pFileState == Closed )
889  return XRootDStatus( stOK, suAlreadyDone );
890 
891  if( self->pFileState == OpenInProgress || self->pFileState == Recovering )
892  return XRootDStatus( stError, errInvalidOp );
893 
894  if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
895  return XRootDStatus( stError, errInvalidOp );
896 
897  self->pFileState = CloseInProgress;
898 
899  Log *log = DefaultEnv::GetLog();
900  log->Debug( FileMsg, "[0x%x@%s] Sending a close command for handle 0x%x to "
901  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
902  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
903 
904  //--------------------------------------------------------------------------
905  // Close the file
906  //--------------------------------------------------------------------------
907  Message *msg;
908  ClientCloseRequest *req;
909  MessageUtils::CreateRequest( msg, req );
910 
911  req->requestid = kXR_close;
912  memcpy( req->fhandle, self->pFileHandle, 4 );
913 
915  msg->SetSessionId( self->pSessionId );
916  CloseHandler *closeHandler = new CloseHandler( self, handler, msg );
917  MessageSendParams params;
918  params.timeout = timeout;
919  params.followRedirects = false;
920  params.stateful = true;
922 
923  XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
924 
925  if( !st.IsOK() )
926  {
927  // an invalid-session error means the connection to the server has been
928  // closed, which in turn means that the server closed the file already
929  if( st.code == errInvalidSession || st.code == errSocketDisconnected ||
931  st.code == errPollerError || st.code == errSocketError )
932  {
933  self->pFileState = Closed;
934  ResponseJob *job = new ResponseJob( closeHandler, new XRootDStatus(),
935  nullptr, nullptr );
937  return XRootDStatus();
938  }
939 
940  delete closeHandler;
941  self->pStatus = st;
942  self->pFileState = Error;
943  return st;
944  }
945  return st;
946  }
947 
948  //----------------------------------------------------------------------------
949  // Stat the file
950  //----------------------------------------------------------------------------
951  XRootDStatus FileStateHandler::Stat( std::shared_ptr<FileStateHandler> &self,
952  bool force,
953  ResponseHandler *handler,
954  uint16_t timeout )
955  {
956  XrdSysMutexHelper scopedLock( self->pMutex );
957 
958  if( self->pFileState == Error ) return self->pStatus;
959 
960  if( self->pFileState != Opened && self->pFileState != Recovering )
961  return XRootDStatus( stError, errInvalidOp );
962 
963  //--------------------------------------------------------------------------
964  // Return the cached info
965  //--------------------------------------------------------------------------
966  if( !force )
967  {
968  AnyObject *obj = new AnyObject();
969  obj->Set( new StatInfo( *self->pStatInfo ) );
970  if (handler)
971  handler->HandleResponseWithHosts( new XRootDStatus(), obj, new HostList() );
972  return XRootDStatus();
973  }
974 
975  Log *log = DefaultEnv::GetLog();
976  log->Debug( FileMsg, "[0x%x@%s] Sending a stat command for handle 0x%x to "
977  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
978  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
979 
980  //--------------------------------------------------------------------------
981  // Issue a new stat request
982  // stating a file handle doesn't work (fixed in 3.2.0) so we need to
983  // stat the pat
984  //--------------------------------------------------------------------------
985  Message *msg;
986  ClientStatRequest *req;
987  std::string path = self->pFileUrl->GetPath();
988  MessageUtils::CreateRequest( msg, req );
989 
990  req->requestid = kXR_stat;
991  memcpy( req->fhandle, self->pFileHandle, 4 );
992 
993  MessageSendParams params;
994  params.timeout = timeout;
995  params.followRedirects = false;
996  params.stateful = true;
998 
1000  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1001 
1002  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1003  }
1004 
1005  //----------------------------------------------------------------------------
1006  // Read a data chunk at a given offset - sync
1007  //----------------------------------------------------------------------------
1008  XRootDStatus FileStateHandler::Read( std::shared_ptr<FileStateHandler> &self,
1009  uint64_t offset,
1010  uint32_t size,
1011  void *buffer,
1012  ResponseHandler *handler,
1013  uint16_t timeout )
1014  {
1015  XrdSysMutexHelper scopedLock( self->pMutex );
1016 
1017  if( self->pFileState == Error ) return self->pStatus;
1018 
1019  if( self->pFileState != Opened && self->pFileState != Recovering )
1020  return XRootDStatus( stError, errInvalidOp );
1021 
1022  Log *log = DefaultEnv::GetLog();
1023  log->Debug( FileMsg, "[0x%x@%s] Sending a read command for handle 0x%x to "
1024  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1025  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1026 
1027  Message *msg;
1028  ClientReadRequest *req;
1029  MessageUtils::CreateRequest( msg, req );
1030 
1031  req->requestid = kXR_read;
1032  req->offset = offset;
1033  req->rlen = size;
1034  memcpy( req->fhandle, self->pFileHandle, 4 );
1035 
1036  ChunkList *list = new ChunkList();
1037  list->push_back( ChunkInfo( offset, size, buffer ) );
1038 
1040  MessageSendParams params;
1041  params.timeout = timeout;
1042  params.followRedirects = false;
1043  params.stateful = true;
1044  params.chunkList = list;
1046  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1047 
1048  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1049  }
1050 
1051  //------------------------------------------------------------------------
1052  // Read data pages at a given offset
1053  //------------------------------------------------------------------------
1054  XRootDStatus FileStateHandler::PgRead( std::shared_ptr<FileStateHandler> &self,
1055  uint64_t offset,
1056  uint32_t size,
1057  void *buffer,
1058  ResponseHandler *handler,
1059  uint16_t timeout )
1060  {
1061  int issupported = true;
1062  AnyObject obj;
1064  int protver = 0;
1065  XRootDStatus st2 = Utils::GetProtocolVersion( *self->pDataServer, protver );
1066  if( st1.IsOK() && st2.IsOK() )
1067  {
1068  int *ptr = 0;
1069  obj.Get( ptr );
1070  issupported = ( *ptr & kXR_suppgrw ) && ( protver >= kXR_PROTPGRWVERSION );
1071  delete ptr;
1072  }
1073  else
1074  issupported = false;
1075 
1076  if( !issupported )
1077  {
1078  DefaultEnv::GetLog()->Debug( FileMsg, "[0x%x@%s] PgRead not supported; substituting with Read.",
1079  self.get(), self->pFileUrl->GetURL().c_str() );
1080  ResponseHandler *substitHandler = new PgReadSubstitutionHandler( self, handler );
1081  auto st = Read( self, offset, size, buffer, substitHandler, timeout );
1082  if( !st.IsOK() ) delete substitHandler;
1083  return st;
1084  }
1085 
1086  ResponseHandler* pgHandler = new PgReadHandler( self, handler, offset );
1087  auto st = PgReadImpl( self, offset, size, buffer, PgReadFlags::None, pgHandler, timeout );
1088  if( !st.IsOK() ) delete pgHandler;
1089  return st;
1090  }
1091 
1092  XRootDStatus FileStateHandler::PgReadRetry( std::shared_ptr<FileStateHandler> &self,
1093  uint64_t offset,
1094  uint32_t size,
1095  size_t pgnb,
1096  void *buffer,
1097  PgReadHandler *handler,
1098  uint16_t timeout )
1099  {
1100  if( size > (uint32_t)XrdSys::PageSize )
1101  return XRootDStatus( stError, errInvalidArgs, EINVAL,
1102  "PgRead retry size exceeded 4KB." );
1103 
1104  ResponseHandler *retryHandler = new PgReadRetryHandler( handler, pgnb );
1105  XRootDStatus st = PgReadImpl( self, offset, size, buffer, PgReadFlags::Retry, retryHandler, timeout );
1106  if( !st.IsOK() ) delete retryHandler;
1107  return st;
1108  }
1109 
1110  XRootDStatus FileStateHandler::PgReadImpl( std::shared_ptr<FileStateHandler> &self,
1111  uint64_t offset,
1112  uint32_t size,
1113  void *buffer,
1114  uint16_t flags,
1115  ResponseHandler *handler,
1116  uint16_t timeout )
1117  {
1118  XrdSysMutexHelper scopedLock( self->pMutex );
1119 
1120  if( self->pFileState == Error ) return self->pStatus;
1121 
1122  if( self->pFileState != Opened && self->pFileState != Recovering )
1123  return XRootDStatus( stError, errInvalidOp );
1124 
1125  Log *log = DefaultEnv::GetLog();
1126  log->Debug( FileMsg, "[0x%x@%s] Sending a pgread command for handle 0x%x to "
1127  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1128  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1129 
1130  Message *msg;
1131  ClientPgReadRequest *req;
1132  MessageUtils::CreateRequest( msg, req, sizeof( ClientPgReadReqArgs ) );
1133 
1134  req->requestid = kXR_pgread;
1135  req->offset = offset;
1136  req->rlen = size;
1137  memcpy( req->fhandle, self->pFileHandle, 4 );
1138 
1139  //--------------------------------------------------------------------------
1140  // Now adjust the message size so it can hold PgRead arguments
1141  //--------------------------------------------------------------------------
1142  req->dlen = sizeof( ClientPgReadReqArgs );
1143  void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
1144  memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
1145  ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
1146  msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
1147  args->reqflags = flags;
1148 
1149  ChunkList *list = new ChunkList();
1150  list->push_back( ChunkInfo( offset, size, buffer ) );
1151 
1153  MessageSendParams params;
1154  params.timeout = timeout;
1155  params.followRedirects = false;
1156  params.stateful = true;
1157  params.chunkList = list;
1159  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1160 
1161  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1162  }
1163 
1164  //----------------------------------------------------------------------------
1165  // Write a data chunk at a given offset - async
1166  //----------------------------------------------------------------------------
1167  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1168  uint64_t offset,
1169  uint32_t size,
1170  const void *buffer,
1171  ResponseHandler *handler,
1172  uint16_t timeout )
1173  {
1174  XrdSysMutexHelper scopedLock( self->pMutex );
1175 
1176  if( self->pFileState == Error ) return self->pStatus;
1177 
1178  if( self->pFileState != Opened && self->pFileState != Recovering )
1179  return XRootDStatus( stError, errInvalidOp );
1180 
1181  Log *log = DefaultEnv::GetLog();
1182  log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
1183  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1184  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1185 
1186  Message *msg;
1187  ClientWriteRequest *req;
1188  MessageUtils::CreateRequest( msg, req );
1189 
1190  req->requestid = kXR_write;
1191  req->offset = offset;
1192  req->dlen = size;
1193  memcpy( req->fhandle, self->pFileHandle, 4 );
1194 
1195  ChunkList *list = new ChunkList();
1196  list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
1197 
1198  MessageSendParams params;
1199  params.timeout = timeout;
1200  params.followRedirects = false;
1201  params.stateful = true;
1202  params.chunkList = list;
1203 
1205 
1207  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1208 
1209  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1210  }
1211 
1212  //----------------------------------------------------------------------------
1213  // Write a data chunk at a given offset
1214  //----------------------------------------------------------------------------
1215  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1216  uint64_t offset,
1217  Buffer &&buffer,
1218  ResponseHandler *handler,
1219  uint16_t timeout )
1220  {
1221  //--------------------------------------------------------------------------
1222  // If the memory is not page (4KB) aligned we cannot use the kernel buffer
1223  // so fall back to normal write
1224  //--------------------------------------------------------------------------
1225  if( !XrdSys::KernelBuffer::IsPageAligned( buffer.GetBuffer() ) || self->pIsChannelEncrypted )
1226  {
1227  Log *log = DefaultEnv::GetLog();
1228  log->Info( FileMsg, "[0x%x@%s] Buffer is not page aligned (4KB), cannot "
1229  "convert it to kernel space buffer.", self.get(), self->pFileUrl->GetURL().c_str(),
1230  *((uint32_t*)self->pFileHandle) );
1231 
1232  void *buff = buffer.GetBuffer();
1233  uint32_t size = buffer.GetSize();
1234  ReleaseBufferHandler *wrtHandler =
1235  new ReleaseBufferHandler( std::move( buffer ), handler );
1236  XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1237  if( !st.IsOK() )
1238  {
1239  buffer = std::move( wrtHandler->GetBuffer() );
1240  delete wrtHandler;
1241  }
1242  return st;
1243  }
1244 
1245  //--------------------------------------------------------------------------
1246  // Transfer the data from user space to kernel space
1247  //--------------------------------------------------------------------------
1248  uint32_t length = buffer.GetSize();
1249  char *ubuff = buffer.Release();
1250 
1251  std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1252  ssize_t ret = XrdSys::Move( ubuff, *kbuff, length );
1253  if( ret < 0 )
1254  return XRootDStatus( stError, errInternal, XProtocol::mapError( errno ) );
1255 
1256  //--------------------------------------------------------------------------
1257  // Now create a write request and enqueue it
1258  //--------------------------------------------------------------------------
1259  return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1260  }
1261 
1262  //----------------------------------------------------------------------------
1263  // Write a data from a given file descriptor at a given offset - async
1264  //----------------------------------------------------------------------------
1265  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1266  uint64_t offset,
1267  uint32_t size,
1268  Optional<uint64_t> fdoff,
1269  int fd,
1270  ResponseHandler *handler,
1271  uint16_t timeout )
1272  {
1273  //--------------------------------------------------------------------------
1274  // Read the data from the file descriptor into a kernel buffer
1275  //--------------------------------------------------------------------------
1276  std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1277  ssize_t ret = fdoff ? XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1278  XrdSys::Read( fd, *kbuff, size );
1279  if( ret < 0 )
1280  return XRootDStatus( stError, errInternal, XProtocol::mapError( errno ) );
1281 
1282  //--------------------------------------------------------------------------
1283  // Now create a write request and enqueue it
1284  //--------------------------------------------------------------------------
1285  return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1286  }
1287 
1288  //----------------------------------------------------------------------------
1289  // Write number of pages at a given offset - async
1290  //----------------------------------------------------------------------------
1291  XRootDStatus FileStateHandler::PgWrite( std::shared_ptr<FileStateHandler> &self,
1292  uint64_t offset,
1293  uint32_t size,
1294  const void *buffer,
1295  std::vector<uint32_t> &cksums,
1296  ResponseHandler *handler,
1297  uint16_t timeout )
1298  {
1299  //--------------------------------------------------------------------------
1300  // Resolve timeout value
1301  //--------------------------------------------------------------------------
1302  if( timeout == 0 )
1303  {
1304  int val = DefaultRequestTimeout;
1305  XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
1306  timeout = val;
1307  }
1308 
1309  //--------------------------------------------------------------------------
1310  // Validate the digest vector size
1311  //--------------------------------------------------------------------------
1312  if( cksums.empty() )
1313  {
1314  const char *data = static_cast<const char*>( buffer );
1315  XrdOucPgrwUtils::csCalc( data, offset, size, cksums );
1316  }
1317  else
1318  {
1319  size_t crc32cCnt = XrdOucPgrwUtils::csNum( offset, size );
1320  if( crc32cCnt != cksums.size() )
1321  return XRootDStatus( stError, errInvalidArgs, 0, "Wrong number of crc32c digests." );
1322  }
1323 
1324  //--------------------------------------------------------------------------
1325  // Create a context for PgWrite operation
1326  //--------------------------------------------------------------------------
1327  struct pgwrt_t
1328  {
1329  pgwrt_t( ResponseHandler *h ) : handler( h ), status( nullptr )
1330  {
1331  }
1332 
1333  ~pgwrt_t()
1334  {
1335  if( handler )
1336  {
1337  // if all retries were successful no error status was set
1338  if( !status ) status = new XRootDStatus();
1339  handler->HandleResponse( status, nullptr );
1340  }
1341  }
1342 
1343  static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1344  {
1345  if( pgoff == offset ) return 0; // we need this if statement because we operate on unsigned integers
1346  return ( pgoff - ( offset + fstpglen ) ) / XrdSys::PageSize + 1;
1347  }
1348 
1349  inline void SetStatus( XRootDStatus* s )
1350  {
1351  if( !status ) status = s;
1352  else delete s;
1353  }
1354 
1355  ResponseHandler *handler;
1356  XRootDStatus *status;
1357  };
1358  auto pgwrt = std::make_shared<pgwrt_t>( handler );
1359 
1360  int fLen, lLen;
1361  XrdOucPgrwUtils::csNum( offset, size, fLen, lLen );
1362  uint32_t fstpglen = fLen;
1363 
1364  time_t start = ::time( nullptr );
1365  auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1366  {
1367  std::unique_ptr<AnyObject> scoped( r );
1368  // if the request failed simply pass the status to the
1369  // user handler
1370  if( !s->IsOK() )
1371  {
1372  pgwrt->SetStatus( s );
1373  return; // pgwrt destructor will call the handler
1374  }
1375  // also if the request was sucessful and there were no
1376  // corrupted pages pass the status to the user handler
1377  RetryInfo *inf = nullptr;
1378  r->Get( inf );
1379  if( !inf->NeedRetry() )
1380  {
1381  pgwrt->SetStatus( s );
1382  return; // pgwrt destructor will call the handler
1383  }
1384  delete s;
1385  // first adjust the timeout value
1386  uint16_t elapsed = ::time( nullptr ) - start;
1387  if( elapsed >= timeout )
1388  {
1389  pgwrt->SetStatus( new XRootDStatus( stError, errOperationExpired ) );
1390  return; // pgwrt destructor will call the handler
1391  }
1392  else timeout -= elapsed;
1393  // retransmit the corrupted pages
1394  for( size_t i = 0; i < inf->Size(); ++i )
1395  {
1396  auto tpl = inf->At( i );
1397  uint64_t pgoff = std::get<0>( tpl );
1398  uint32_t pglen = std::get<1>( tpl );
1399  const void *pgbuf = static_cast<const char*>( buffer ) + ( pgoff - offset );
1400  uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1401  auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1402  {
1403  std::unique_ptr<AnyObject> scoped( r );
1404  // if we failed simply set the status
1405  if( !s->IsOK() )
1406  {
1407  pgwrt->SetStatus( s );
1408  return; // the destructor will call the handler
1409  }
1410  delete s;
1411  // otherwise check if the data were not corrupted again
1412  RetryInfo *inf = nullptr;
1413  r->Get( inf );
1414  if( inf->NeedRetry() ) // so we failed in the end
1415  {
1416  DefaultEnv::GetLog()->Warning( FileMsg, "[0x%x@%s] Failed retransmitting corrupted "
1417  "page: pgoff=%llu, pglen=%du, pgdigest=%du", self.get(),
1418  self->pFileUrl->GetURL().c_str(), pgoff, pglen, pgdigest );
1419  pgwrt->SetStatus( new XRootDStatus( stError, errDataError, 0,
1420  "Failed to retransmit corrupted page" ) );
1421  }
1422  else
1423  DefaultEnv::GetLog()->Info( FileMsg, "[0x%x@%s] Succesfuly retransmitted corrupted "
1424  "page: pgoff=%llu, pglen=%du, pgdigest=%du", self.get(),
1425  self->pFileUrl->GetURL().c_str(), pgoff, pglen, pgdigest );
1426  } );
1427  auto st = PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1428  if( !st.IsOK() ) pgwrt->SetStatus( new XRootDStatus( st ) );
1429  DefaultEnv::GetLog()->Info( FileMsg, "[0x%x@%s] Retransmitting corrupted page: "
1430  "pgoff=%llu, pglen=%du, pgdigest=%du", self.get(),
1431  self->pFileUrl->GetURL().c_str(), pgoff, pglen, pgdigest );
1432  }
1433  } );
1434 
1435  auto st = PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1436  if( !st.IsOK() )
1437  {
1438  pgwrt->handler = nullptr;
1439  delete h;
1440  }
1441  return st;
1442  }
1443 
1444  //------------------------------------------------------------------------
1445  // Write number of pages at a given offset - async
1446  //------------------------------------------------------------------------
1447  XRootDStatus FileStateHandler::PgWriteRetry( std::shared_ptr<FileStateHandler> &self,
1448  uint64_t offset,
1449  uint32_t size,
1450  const void *buffer,
1451  uint32_t digest,
1452  ResponseHandler *handler,
1453  uint16_t timeout )
1454  {
1455  std::vector<uint32_t> cksums{ digest };
1456  return PgWriteImpl( self, offset, size, buffer, cksums, PgReadFlags::Retry, handler, timeout );
1457  }
1458 
1459  //------------------------------------------------------------------------
1460  // Write number of pages at a given offset - async
1461  //------------------------------------------------------------------------
1462  XRootDStatus FileStateHandler::PgWriteImpl( std::shared_ptr<FileStateHandler> &self,
1463  uint64_t offset,
1464  uint32_t size,
1465  const void *buffer,
1466  std::vector<uint32_t> &cksums,
1467  kXR_char flags,
1468  ResponseHandler *handler,
1469  uint16_t timeout )
1470  {
1471  XrdSysMutexHelper scopedLock( self->pMutex );
1472 
1473  if( self->pFileState == Error ) return self->pStatus;
1474 
1475  if( self->pFileState != Opened && self->pFileState != Recovering )
1476  return XRootDStatus( stError, errInvalidOp );
1477 
1478  Log *log = DefaultEnv::GetLog();
1479  log->Debug( FileMsg, "[0x%x@%s] Sending a pgwrite command for handle 0x%x to "
1480  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1481  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1482 
1483  //--------------------------------------------------------------------------
1484  // Create the message
1485  //--------------------------------------------------------------------------
1486  Message *msg;
1487  ClientPgWriteRequest *req;
1488  MessageUtils::CreateRequest( msg, req );
1489 
1490  req->requestid = kXR_pgwrite;
1491  req->offset = offset;
1492  req->dlen = size + cksums.size() * sizeof( uint32_t );
1493  req->reqflags = flags;
1494  memcpy( req->fhandle, self->pFileHandle, 4 );
1495 
1496  ChunkList *list = new ChunkList();
1497  list->push_back( ChunkInfo( offset, size, (char*)buffer ) );
1498 
1499  MessageSendParams params;
1500  params.timeout = timeout;
1501  params.followRedirects = false;
1502  params.stateful = true;
1503  params.chunkList = list;
1504  params.crc32cDigests.swap( cksums );
1505 
1507 
1509  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1510 
1511  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1512  }
1513 
1514  //----------------------------------------------------------------------------
1515  // Commit all pending disk writes - async
1516  //----------------------------------------------------------------------------
1517  XRootDStatus FileStateHandler::Sync( std::shared_ptr<FileStateHandler> &self,
1518  ResponseHandler *handler,
1519  uint16_t timeout )
1520  {
1521  XrdSysMutexHelper scopedLock( self->pMutex );
1522 
1523  if( self->pFileState == Error ) return self->pStatus;
1524 
1525  if( self->pFileState != Opened && self->pFileState != Recovering )
1526  return XRootDStatus( stError, errInvalidOp );
1527 
1528  Log *log = DefaultEnv::GetLog();
1529  log->Debug( FileMsg, "[0x%x@%s] Sending a sync command for handle 0x%x to "
1530  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1531  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1532 
1533  Message *msg;
1534  ClientSyncRequest *req;
1535  MessageUtils::CreateRequest( msg, req );
1536 
1537  req->requestid = kXR_sync;
1538  memcpy( req->fhandle, self->pFileHandle, 4 );
1539 
1540  MessageSendParams params;
1541  params.timeout = timeout;
1542  params.followRedirects = false;
1543  params.stateful = true;
1545 
1547  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1548 
1549  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1550  }
1551 
1552  //----------------------------------------------------------------------------
1553  // Truncate the file to a particular size - async
1554  //----------------------------------------------------------------------------
1555  XRootDStatus FileStateHandler::Truncate( std::shared_ptr<FileStateHandler> &self,
1556  uint64_t size,
1557  ResponseHandler *handler,
1558  uint16_t timeout )
1559  {
1560  XrdSysMutexHelper scopedLock( self->pMutex );
1561 
1562  if( self->pFileState == Error ) return self->pStatus;
1563 
1564  if( self->pFileState != Opened && self->pFileState != Recovering )
1565  return XRootDStatus( stError, errInvalidOp );
1566 
1567  Log *log = DefaultEnv::GetLog();
1568  log->Debug( FileMsg, "[0x%x@%s] Sending a truncate command for handle 0x%x to "
1569  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1570  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1571 
1572  Message *msg;
1573  ClientTruncateRequest *req;
1574  MessageUtils::CreateRequest( msg, req );
1575 
1576  req->requestid = kXR_truncate;
1577  memcpy( req->fhandle, self->pFileHandle, 4 );
1578  req->offset = size;
1579 
1580  MessageSendParams params;
1581  params.timeout = timeout;
1582  params.followRedirects = false;
1583  params.stateful = true;
1585 
1587  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1588 
1589  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1590  }
1591 
1592  //----------------------------------------------------------------------------
1593  // Read scattered data chunks in one operation - async
1594  //----------------------------------------------------------------------------
1595  XRootDStatus FileStateHandler::VectorRead( std::shared_ptr<FileStateHandler> &self,
1596  const ChunkList &chunks,
1597  void *buffer,
1598  ResponseHandler *handler,
1599  uint16_t timeout )
1600  {
1601  //--------------------------------------------------------------------------
1602  // Sanity check
1603  //--------------------------------------------------------------------------
1604  XrdSysMutexHelper scopedLock( self->pMutex );
1605 
1606  if( self->pFileState == Error ) return self->pStatus;
1607 
1608  if( self->pFileState != Opened && self->pFileState != Recovering )
1609  return XRootDStatus( stError, errInvalidOp );
1610 
1611  Log *log = DefaultEnv::GetLog();
1612  log->Debug( FileMsg, "[0x%x@%s] Sending a vector read command for handle "
1613  "0x%x to %s", self.get(), self->pFileUrl->GetURL().c_str(),
1614  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1615 
1616  //--------------------------------------------------------------------------
1617  // Build the message
1618  //--------------------------------------------------------------------------
1619  Message *msg;
1620  ClientReadVRequest *req;
1621  MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*chunks.size() );
1622 
1623  req->requestid = kXR_readv;
1624  req->dlen = sizeof(readahead_list)*chunks.size();
1625 
1626  ChunkList *list = new ChunkList();
1627  char *cursor = (char*)buffer;
1628 
1629  //--------------------------------------------------------------------------
1630  // Copy the chunk info
1631  //--------------------------------------------------------------------------
1632  readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
1633  for( size_t i = 0; i < chunks.size(); ++i )
1634  {
1635  dataChunk[i].rlen = chunks[i].length;
1636  dataChunk[i].offset = chunks[i].offset;
1637  memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1638 
1639  void *chunkBuffer;
1640  if( cursor )
1641  {
1642  chunkBuffer = cursor;
1643  cursor += chunks[i].length;
1644  }
1645  else
1646  chunkBuffer = chunks[i].buffer;
1647 
1648  list->push_back( ChunkInfo( chunks[i].offset,
1649  chunks[i].length,
1650  chunkBuffer ) );
1651  }
1652 
1653  //--------------------------------------------------------------------------
1654  // Send the message
1655  //--------------------------------------------------------------------------
1656  MessageSendParams params;
1657  params.timeout = timeout;
1658  params.followRedirects = false;
1659  params.stateful = true;
1660  params.chunkList = list;
1662 
1664  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1665 
1666  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1667  }
1668 
1669  //------------------------------------------------------------------------
1670  // Write scattered data chunks in one operation - async
1671  //------------------------------------------------------------------------
1672  XRootDStatus FileStateHandler::VectorWrite( std::shared_ptr<FileStateHandler> &self,
1673  const ChunkList &chunks,
1674  ResponseHandler *handler,
1675  uint16_t timeout )
1676  {
1677  //--------------------------------------------------------------------------
1678  // Sanity check
1679  //--------------------------------------------------------------------------
1680  XrdSysMutexHelper scopedLock( self->pMutex );
1681 
1682  if( self->pFileState == Error ) return self->pStatus;
1683 
1684  if( self->pFileState != Opened && self->pFileState != Recovering )
1685  return XRootDStatus( stError, errInvalidOp );
1686 
1687  Log *log = DefaultEnv::GetLog();
1688  log->Debug( FileMsg, "[0x%x@%s] Sending a vector write command for handle "
1689  "0x%x to %s", self.get(), self->pFileUrl->GetURL().c_str(),
1690  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1691 
1692  //--------------------------------------------------------------------------
1693  // Determine the size of the payload
1694  //--------------------------------------------------------------------------
1695 
1696  // the size of write vector
1697  uint32_t payloadSize = sizeof(XrdProto::write_list) * chunks.size();
1698 
1699  //--------------------------------------------------------------------------
1700  // Build the message
1701  //--------------------------------------------------------------------------
1702  Message *msg;
1703  ClientWriteVRequest *req;
1704  MessageUtils::CreateRequest( msg, req, payloadSize );
1705 
1706  req->requestid = kXR_writev;
1707  req->dlen = sizeof(XrdProto::write_list) * chunks.size();
1708 
1709  ChunkList *list = new ChunkList();
1710 
1711  //--------------------------------------------------------------------------
1712  // Copy the chunk info
1713  //--------------------------------------------------------------------------
1714  XrdProto::write_list *writeList =
1715  reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
1716 
1717 
1718 
1719  for( size_t i = 0; i < chunks.size(); ++i )
1720  {
1721  writeList[i].wlen = chunks[i].length;
1722  writeList[i].offset = chunks[i].offset;
1723  memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1724 
1725  list->push_back( ChunkInfo( chunks[i].offset,
1726  chunks[i].length,
1727  chunks[i].buffer ) );
1728  }
1729 
1730  //--------------------------------------------------------------------------
1731  // Send the message
1732  //--------------------------------------------------------------------------
1733  MessageSendParams params;
1734  params.timeout = timeout;
1735  params.followRedirects = false;
1736  params.stateful = true;
1737  params.chunkList = list;
1739 
1741  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1742 
1743  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1744  }
1745 
1746  //------------------------------------------------------------------------
1747  // Write scattered buffers in one operation - async
1748  //------------------------------------------------------------------------
1749  XRootDStatus FileStateHandler::WriteV( std::shared_ptr<FileStateHandler> &self,
1750  uint64_t offset,
1751  const struct iovec *iov,
1752  int iovcnt,
1753  ResponseHandler *handler,
1754  uint16_t timeout )
1755  {
1756  XrdSysMutexHelper scopedLock( self->pMutex );
1757 
1758  if( self->pFileState == Error ) return self->pStatus;
1759 
1760  if( self->pFileState != Opened && self->pFileState != Recovering )
1761  return XRootDStatus( stError, errInvalidOp );
1762 
1763  Log *log = DefaultEnv::GetLog();
1764  log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
1765  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1766  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1767 
1768  Message *msg;
1769  ClientWriteRequest *req;
1770  MessageUtils::CreateRequest( msg, req );
1771 
1772  ChunkList *list = new ChunkList();
1773 
1774  uint32_t size = 0;
1775  for( int i = 0; i < iovcnt; ++i )
1776  {
1777  if( iov[i].iov_len == 0 ) continue;
1778  size += iov[i].iov_len;
1779  list->push_back( ChunkInfo( 0, iov[i].iov_len,
1780  (char*)iov[i].iov_base ) );
1781  }
1782 
1783  req->requestid = kXR_write;
1784  req->offset = offset;
1785  req->dlen = size;
1786  memcpy( req->fhandle, self->pFileHandle, 4 );
1787 
1788  MessageSendParams params;
1789  params.timeout = timeout;
1790  params.followRedirects = false;
1791  params.stateful = true;
1792  params.chunkList = list;
1793 
1795 
1797  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1798 
1799  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1800  }
1801 
1802  //------------------------------------------------------------------------
1803  // Read data into scattered buffers in one operation - async
1804  //------------------------------------------------------------------------
1805  XRootDStatus FileStateHandler::ReadV( std::shared_ptr<FileStateHandler> &self,
1806  uint64_t offset,
1807  struct iovec *iov,
1808  int iovcnt,
1809  ResponseHandler *handler,
1810  uint16_t timeout )
1811  {
1812  XrdSysMutexHelper scopedLock( self->pMutex );
1813 
1814  if( self->pFileState == Error ) return self->pStatus;
1815 
1816  if( self->pFileState != Opened && self->pFileState != Recovering )
1817  return XRootDStatus( stError, errInvalidOp );
1818 
1819  Log *log = DefaultEnv::GetLog();
1820  log->Debug( FileMsg, "[0x%x@%s] Sending a read command for handle 0x%x to "
1821  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1822  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1823 
1824  Message *msg;
1825  ClientReadRequest *req;
1826  MessageUtils::CreateRequest( msg, req );
1827 
1828  // calculate the total read size
1829  size_t size = std::accumulate( iov, iov + iovcnt, 0, []( size_t acc, iovec &rhs )
1830  {
1831  return acc + rhs.iov_len;
1832  } );
1833  req->requestid = kXR_read;
1834  req->offset = offset;
1835  req->rlen = size;
1836  msg->SetVirtReqID( kXR_virtReadv );
1837  memcpy( req->fhandle, self->pFileHandle, 4 );
1838 
1839  ChunkList *list = new ChunkList();
1840  list->reserve( iovcnt );
1841  uint64_t choff = offset;
1842  for( int i = 0; i < iovcnt; ++i )
1843  {
1844  list->emplace_back( choff, iov[i].iov_len, iov[i].iov_base );
1845  choff += iov[i].iov_len;
1846  }
1847 
1849  MessageSendParams params;
1850  params.timeout = timeout;
1851  params.followRedirects = false;
1852  params.stateful = true;
1853  params.chunkList = list;
1855  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1856 
1857  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1858  }
1859 
1860  //----------------------------------------------------------------------------
1861  // Performs a custom operation on an open file, server implementation
1862  // dependent - async
1863  //----------------------------------------------------------------------------
1864  XRootDStatus FileStateHandler::Fcntl( std::shared_ptr<FileStateHandler> &self,
1865  const Buffer &arg,
1866  ResponseHandler *handler,
1867  uint16_t timeout )
1868  {
1869  XrdSysMutexHelper scopedLock( self->pMutex );
1870 
1871  if( self->pFileState == Error ) return self->pStatus;
1872 
1873  if( self->pFileState != Opened && self->pFileState != Recovering )
1874  return XRootDStatus( stError, errInvalidOp );
1875 
1876  Log *log = DefaultEnv::GetLog();
1877  log->Debug( FileMsg, "[0x%x@%s] Sending a fcntl command for handle 0x%x to "
1878  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1879  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1880 
1881  Message *msg;
1882  ClientQueryRequest *req;
1883  MessageUtils::CreateRequest( msg, req, arg.GetSize() );
1884 
1885  req->requestid = kXR_query;
1886  req->infotype = kXR_Qopaqug;
1887  req->dlen = arg.GetSize();
1888  memcpy( req->fhandle, self->pFileHandle, 4 );
1889  msg->Append( arg.GetBuffer(), arg.GetSize(), 24 );
1890 
1891  MessageSendParams params;
1892  params.timeout = timeout;
1893  params.followRedirects = false;
1894  params.stateful = true;
1896 
1898  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1899 
1900  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1901  }
1902 
1903  //----------------------------------------------------------------------------
1904  // Get access token to a file - async
1905  //----------------------------------------------------------------------------
1906  XRootDStatus FileStateHandler::Visa( std::shared_ptr<FileStateHandler> &self,
1907  ResponseHandler *handler,
1908  uint16_t timeout )
1909  {
1910  XrdSysMutexHelper scopedLock( self->pMutex );
1911 
1912  if( self->pFileState == Error ) return self->pStatus;
1913 
1914  if( self->pFileState != Opened && self->pFileState != Recovering )
1915  return XRootDStatus( stError, errInvalidOp );
1916 
1917  Log *log = DefaultEnv::GetLog();
1918  log->Debug( FileMsg, "[0x%x@%s] Sending a visa command for handle 0x%x to "
1919  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1920  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1921 
1922  Message *msg;
1923  ClientQueryRequest *req;
1924  MessageUtils::CreateRequest( msg, req );
1925 
1926  req->requestid = kXR_query;
1927  req->infotype = kXR_Qvisa;
1928  memcpy( req->fhandle, self->pFileHandle, 4 );
1929 
1930  MessageSendParams params;
1931  params.timeout = timeout;
1932  params.followRedirects = false;
1933  params.stateful = true;
1935 
1937  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1938 
1939  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1940  }
1941 
1942  //------------------------------------------------------------------------
1943  // Set extended attributes - async
1944  //------------------------------------------------------------------------
1945  XRootDStatus FileStateHandler::SetXAttr( std::shared_ptr<FileStateHandler> &self,
1946  const std::vector<xattr_t> &attrs,
1947  ResponseHandler *handler,
1948  uint16_t timeout )
1949  {
1950  XrdSysMutexHelper scopedLock( self->pMutex );
1951 
1952  if( self->pFileState == Error ) return self->pStatus;
1953 
1954  if( self->pFileState != Opened && self->pFileState != Recovering )
1955  return XRootDStatus( stError, errInvalidOp );
1956 
1957  Log *log = DefaultEnv::GetLog();
1958  log->Debug( FileMsg, "[0x%x@%s] Sending a fattr set command for handle 0x%x to "
1959  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1960  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1961 
1962  //--------------------------------------------------------------------------
1963  // Issue a new fattr get request
1964  //--------------------------------------------------------------------------
1965  return XAttrOperationImpl( self, kXR_fattrSet, 0, attrs, handler, timeout );
1966  }
1967 
1968  //------------------------------------------------------------------------
1969  // Get extended attributes - async
1970  //------------------------------------------------------------------------
1971  XRootDStatus FileStateHandler::GetXAttr( std::shared_ptr<FileStateHandler> &self,
1972  const std::vector<std::string> &attrs,
1973  ResponseHandler *handler,
1974  uint16_t timeout )
1975  {
1976  XrdSysMutexHelper scopedLock( self->pMutex );
1977 
1978  if( self->pFileState == Error ) return self->pStatus;
1979 
1980  if( self->pFileState != Opened && self->pFileState != Recovering )
1981  return XRootDStatus( stError, errInvalidOp );
1982 
1983  Log *log = DefaultEnv::GetLog();
1984  log->Debug( FileMsg, "[0x%x@%s] Sending a fattr get command for handle 0x%x to "
1985  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1986  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1987 
1988  //--------------------------------------------------------------------------
1989  // Issue a new fattr get request
1990  //--------------------------------------------------------------------------
1991  return XAttrOperationImpl( self, kXR_fattrGet, 0, attrs, handler, timeout );
1992  }
1993 
1994  //------------------------------------------------------------------------
1995  // Delete extended attributes - async
1996  //------------------------------------------------------------------------
1997  XRootDStatus FileStateHandler::DelXAttr( std::shared_ptr<FileStateHandler> &self,
1998  const std::vector<std::string> &attrs,
1999  ResponseHandler *handler,
2000  uint16_t timeout )
2001  {
2002  XrdSysMutexHelper scopedLock( self->pMutex );
2003 
2004  if( self->pFileState == Error ) return self->pStatus;
2005 
2006  if( self->pFileState != Opened && self->pFileState != Recovering )
2007  return XRootDStatus( stError, errInvalidOp );
2008 
2009  Log *log = DefaultEnv::GetLog();
2010  log->Debug( FileMsg, "[0x%x@%s] Sending a fattr del command for handle 0x%x to "
2011  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
2012  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2013 
2014  //--------------------------------------------------------------------------
2015  // Issue a new fattr del request
2016  //--------------------------------------------------------------------------
2017  return XAttrOperationImpl( self, kXR_fattrDel, 0, attrs, handler, timeout );
2018  }
2019 
2020  //------------------------------------------------------------------------
2021  // List extended attributes - async
2022  //------------------------------------------------------------------------
2023  XRootDStatus FileStateHandler::ListXAttr( std::shared_ptr<FileStateHandler> &self,
2024  ResponseHandler *handler,
2025  uint16_t timeout )
2026  {
2027  XrdSysMutexHelper scopedLock( self->pMutex );
2028 
2029  if( self->pFileState == Error ) return self->pStatus;
2030 
2031  if( self->pFileState != Opened && self->pFileState != Recovering )
2032  return XRootDStatus( stError, errInvalidOp );
2033 
2034  Log *log = DefaultEnv::GetLog();
2035  log->Debug( FileMsg, "[0x%x@%s] Sending a fattr list command for handle 0x%x to "
2036  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
2037  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2038 
2039  //--------------------------------------------------------------------------
2040  // Issue a new fattr get request
2041  //--------------------------------------------------------------------------
2042  static const std::vector<std::string> nothing;
2043  return XAttrOperationImpl( self, kXR_fattrList, ClientFattrRequest::aData,
2044  nothing, handler, timeout );
2045  }
2046 
2047  //------------------------------------------------------------------------
2057  //------------------------------------------------------------------------
2058  XRootDStatus FileStateHandler::Checkpoint( std::shared_ptr<FileStateHandler> &self,
2059  kXR_char code,
2060  ResponseHandler *handler,
2061  uint16_t timeout )
2062  {
2063  XrdSysMutexHelper scopedLock( self->pMutex );
2064 
2065  if( self->pFileState == Error ) return self->pStatus;
2066 
2067  if( self->pFileState != Opened && self->pFileState != Recovering )
2068  return XRootDStatus( stError, errInvalidOp );
2069 
2070  Log *log = DefaultEnv::GetLog();
2071  log->Debug( FileMsg, "[0x%x@%s] Sending a checkpoint command for "
2072  "handle 0x%x to %s", self.get(), self->pFileUrl->GetURL().c_str(),
2073  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2074 
2075  Message *msg;
2076  ClientChkPointRequest *req;
2077  MessageUtils::CreateRequest( msg, req );
2078 
2079  req->requestid = kXR_chkpoint;
2080  req->opcode = code;
2081  memcpy( req->fhandle, self->pFileHandle, 4 );
2082 
2083  MessageSendParams params;
2084  params.timeout = timeout;
2085  params.followRedirects = false;
2086  params.stateful = true;
2087 
2089 
2091  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2092 
2093  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2094  }
2095 
2096  //------------------------------------------------------------------------
2106  //------------------------------------------------------------------------
2107  XRootDStatus FileStateHandler::ChkptWrt( std::shared_ptr<FileStateHandler> &self,
2108  uint64_t offset,
2109  uint32_t size,
2110  const void *buffer,
2111  ResponseHandler *handler,
2112  uint16_t timeout )
2113  {
2114  XrdSysMutexHelper scopedLock( self->pMutex );
2115 
2116  if( self->pFileState == Error ) return self->pStatus;
2117 
2118  if( self->pFileState != Opened && self->pFileState != Recovering )
2119  return XRootDStatus( stError, errInvalidOp );
2120 
2121  Log *log = DefaultEnv::GetLog();
2122  log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
2123  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
2124  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2125 
2126  Message *msg;
2127  ClientChkPointRequest *req;
2128  MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2129 
2130  req->requestid = kXR_chkpoint;
2131  req->opcode = kXR_ckpXeq;
2132  req->dlen = 24; // as specified in the protocol specification
2133  memcpy( req->fhandle, self->pFileHandle, 4 );
2134 
2136  wrtreq->requestid = kXR_write;
2137  wrtreq->offset = offset;
2138  wrtreq->dlen = size;
2139  memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2140 
2141  ChunkList *list = new ChunkList();
2142  list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
2143 
2144  MessageSendParams params;
2145  params.timeout = timeout;
2146  params.followRedirects = false;
2147  params.stateful = true;
2148  params.chunkList = list;
2149 
2151 
2153  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2154 
2155  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2156  }
2157 
2158  //------------------------------------------------------------------------
2168  //------------------------------------------------------------------------
2169  XRootDStatus FileStateHandler::ChkptWrtV( std::shared_ptr<FileStateHandler> &self,
2170  uint64_t offset,
2171  const struct iovec *iov,
2172  int iovcnt,
2173  ResponseHandler *handler,
2174  uint16_t timeout )
2175  {
2176  XrdSysMutexHelper scopedLock( self->pMutex );
2177 
2178  if( self->pFileState == Error ) return self->pStatus;
2179 
2180  if( self->pFileState != Opened && self->pFileState != Recovering )
2181  return XRootDStatus( stError, errInvalidOp );
2182 
2183  Log *log = DefaultEnv::GetLog();
2184  log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
2185  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
2186  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2187 
2188  Message *msg;
2189  ClientChkPointRequest *req;
2190  MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2191 
2192  req->requestid = kXR_chkpoint;
2193  req->opcode = kXR_ckpXeq;
2194  req->dlen = 24; // as specified in the protocol specification
2195  memcpy( req->fhandle, self->pFileHandle, 4 );
2196 
2197  ChunkList *list = new ChunkList();
2198  uint32_t size = 0;
2199  for( int i = 0; i < iovcnt; ++i )
2200  {
2201  if( iov[i].iov_len == 0 ) continue;
2202  size += iov[i].iov_len;
2203  list->push_back( ChunkInfo( 0, iov[i].iov_len,
2204  (char*)iov[i].iov_base ) );
2205  }
2206 
2208  wrtreq->requestid = kXR_write;
2209  wrtreq->offset = offset;
2210  wrtreq->dlen = size;
2211  memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2212 
2213  MessageSendParams params;
2214  params.timeout = timeout;
2215  params.followRedirects = false;
2216  params.stateful = true;
2217  params.chunkList = list;
2218 
2220 
2222  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2223 
2224  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2225  }
2226 
2227  //----------------------------------------------------------------------------
2228  // Check if the file is open
2229  //----------------------------------------------------------------------------
2231  {
2232  XrdSysMutexHelper scopedLock( pMutex );
2233 
2234  if( pFileState == Opened || pFileState == Recovering )
2235  return true;
2236  return false;
2237  }
2238 
2239  //----------------------------------------------------------------------------
2240  // Set file property
2241  //----------------------------------------------------------------------------
2242  bool FileStateHandler::SetProperty( const std::string &name,
2243  const std::string &value )
2244  {
2245  XrdSysMutexHelper scopedLock( pMutex );
2246  if( name == "ReadRecovery" )
2247  {
2248  if( value == "true" ) pDoRecoverRead = true;
2249  else pDoRecoverRead = false;
2250  return true;
2251  }
2252  else if( name == "WriteRecovery" )
2253  {
2254  if( value == "true" ) pDoRecoverWrite = true;
2255  else pDoRecoverWrite = false;
2256  return true;
2257  }
2258  else if( name == "FollowRedirects" )
2259  {
2260  if( value == "true" ) pFollowRedirects = true;
2261  else pFollowRedirects = false;
2262  return true;
2263  }
2264  else if( name == "BundledClose" )
2265  {
2266  if( value == "true" ) pAllowBundledClose = true;
2267  else pAllowBundledClose = false;
2268  return true;
2269  }
2270  return false;
2271  }
2272 
2273  //----------------------------------------------------------------------------
2274  // Get file property
2275  //----------------------------------------------------------------------------
2276  bool FileStateHandler::GetProperty( const std::string &name,
2277  std::string &value ) const
2278  {
2279  XrdSysMutexHelper scopedLock( pMutex );
2280  if( name == "ReadRecovery" )
2281  {
2282  if( pDoRecoverRead ) value = "true";
2283  else value = "false";
2284  return true;
2285  }
2286  else if( name == "WriteRecovery" )
2287  {
2288  if( pDoRecoverWrite ) value = "true";
2289  else value = "false";
2290  return true;
2291  }
2292  else if( name == "FollowRedirects" )
2293  {
2294  if( pFollowRedirects ) value = "true";
2295  else value = "false";
2296  return true;
2297  }
2298  else if( name == "DataServer" && pDataServer )
2299  { value = pDataServer->GetHostId(); return true; }
2300  else if( name == "LastURL" && pDataServer )
2301  { value = pDataServer->GetURL(); return true; }
2302  else if( name == "WrtRecoveryRedir" && pWrtRecoveryRedir )
2303  { value = pWrtRecoveryRedir->GetHostId(); return true; }
2304  value = "";
2305  return false;
2306  }
2307 
2308  //----------------------------------------------------------------------------
2309  // Process the results of the opening operation
2310  //----------------------------------------------------------------------------
2312  const OpenInfo *openInfo,
2313  const HostList *hostList )
2314  {
2315  Log *log = DefaultEnv::GetLog();
2316  XrdSysMutexHelper scopedLock( pMutex );
2317 
2318  //--------------------------------------------------------------------------
2319  // Assign the data server and the load balancer
2320  //--------------------------------------------------------------------------
2321  std::string lastServer = pFileUrl->GetHostId();
2322  if( hostList )
2323  {
2324  delete pDataServer;
2325  delete pLoadBalancer;
2326  pLoadBalancer = 0;
2327  delete pWrtRecoveryRedir;
2328  pWrtRecoveryRedir = 0;
2329 
2330  pDataServer = new URL( hostList->back().url );
2331  pDataServer->SetParams( pFileUrl->GetParams() );
2332  if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2333  lastServer = pDataServer->GetHostId();
2334  HostList::const_iterator itC;
2335  URL::ParamsMap params = pDataServer->GetParams();
2336  for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2337  {
2338  MessageUtils::MergeCGI( params,
2339  itC->url.GetParams(),
2340  true );
2341  }
2342  pDataServer->SetParams( params );
2343 
2344  HostList::const_reverse_iterator it;
2345  for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2346  if( it->loadBalancer )
2347  {
2348  pLoadBalancer = new URL( it->url );
2349  break;
2350  }
2351 
2352  for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2353  if( it->flags & kXR_recoverWrts )
2354  {
2355  pWrtRecoveryRedir = new URL( it->url );
2356  break;
2357  }
2358  }
2359 
2360  log->Debug( FileMsg, "[0x%x@%s] Open has returned with status %s",
2361  this, pFileUrl->GetURL().c_str(), status->ToStr().c_str() );
2362 
2363  if( pDataServer && !pDataServer->IsLocalFile() )
2364  {
2365  //------------------------------------------------------------------------
2366  // Check if we are using a secure connection
2367  //------------------------------------------------------------------------
2368  XrdCl::AnyObject isencobj;
2370  QueryTransport( *pDataServer, XRootDQuery::IsEncrypted, isencobj );
2371  if( st.IsOK() )
2372  {
2373  bool *isenc;
2374  isencobj.Get( isenc );
2375  pIsChannelEncrypted = *isenc;
2376  delete isenc;
2377  }
2378  }
2379 
2380  //--------------------------------------------------------------------------
2381  // We have failed
2382  //--------------------------------------------------------------------------
2383  pStatus = *status;
2384  if( !pStatus.IsOK() || !openInfo )
2385  {
2386  log->Debug( FileMsg, "[0x%x@%s] Error while opening at %s: %s",
2387  this, pFileUrl->GetURL().c_str(), lastServer.c_str(),
2388  pStatus.ToStr().c_str() );
2389  FailQueuedMessages( pStatus );
2390  pFileState = Error;
2391 
2392  //------------------------------------------------------------------------
2393  // Report to monitoring
2394  //------------------------------------------------------------------------
2396  if( mon )
2397  {
2399  i.file = pFileUrl;
2400  i.status = status;
2402  mon->Event( Monitor::EvErrIO, &i );
2403  }
2404  }
2405  //--------------------------------------------------------------------------
2406  // We have succeeded
2407  //--------------------------------------------------------------------------
2408  else
2409  {
2410  //------------------------------------------------------------------------
2411  // Store the response info
2412  //------------------------------------------------------------------------
2413  openInfo->GetFileHandle( pFileHandle );
2414  pSessionId = openInfo->GetSessionId();
2415  if( openInfo->GetStatInfo() )
2416  {
2417  delete pStatInfo;
2418  pStatInfo = new StatInfo( *openInfo->GetStatInfo() );
2419  }
2420 
2421  log->Debug( FileMsg, "[0x%x@%s] successfully opened at %s, handle: 0x%x, "
2422  "session id: %ld", this, pFileUrl->GetURL().c_str(),
2423  pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2424  pSessionId );
2425 
2426  //------------------------------------------------------------------------
2427  // Inform the monitoring about opening success
2428  //------------------------------------------------------------------------
2429  gettimeofday( &pOpenTime, 0 );
2431  if( mon )
2432  {
2434  i.file = pFileUrl;
2435  i.dataServer = pDataServer->GetHostId();
2436  i.oFlags = pOpenFlags;
2437  i.fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2438  mon->Event( Monitor::EvOpen, &i );
2439  }
2440 
2441  //------------------------------------------------------------------------
2442  // Resend the queued messages if any
2443  //------------------------------------------------------------------------
2444  ReSendQueuedMessages();
2445  pFileState = Opened;
2446  }
2447  }
2448 
2449  //----------------------------------------------------------------------------
2450  // Process the results of the closing operation
2451  //----------------------------------------------------------------------------
2453  {
2454  Log *log = DefaultEnv::GetLog();
2455  XrdSysMutexHelper scopedLock( pMutex );
2456 
2457  log->Debug( FileMsg, "[0x%x@%s] Close returned from %s with: %s", this,
2458  pFileUrl->GetURL().c_str(), pDataServer->GetHostId().c_str(),
2459  status->ToStr().c_str() );
2460 
2461  log->Dump( FileMsg, "[0x%x@%s] Items in the fly %d, queued for recovery %d",
2462  this, pFileUrl->GetURL().c_str(), pInTheFly.size(),
2463  pToBeRecovered.size() );
2464 
2465  MonitorClose( status );
2466  ResetMonitoringVars();
2467 
2468  pStatus = *status;
2469  pFileState = Closed;
2470  }
2471 
2472  //----------------------------------------------------------------------------
2473  // Handle an error while sending a stateful message
2474  //----------------------------------------------------------------------------
2475  void FileStateHandler::OnStateError( std::shared_ptr<FileStateHandler> &self,
2476  XRootDStatus *status,
2477  Message *message,
2478  ResponseHandler *userHandler,
2479  MessageSendParams &sendParams )
2480  {
2481  //--------------------------------------------------------------------------
2482  // It may be a redirection
2483  //--------------------------------------------------------------------------
2484  if( !status->IsOK() && status->code == errRedirect && self->pFollowRedirects )
2485  {
2486  static const std::string root = "root", xroot = "xroot", file = "file",
2487  roots = "roots", xroots = "xroots";
2488  std::string msg = status->GetErrorMessage();
2489  if( !msg.compare( 0, root.size(), root ) ||
2490  !msg.compare( 0, xroot.size(), xroot ) ||
2491  !msg.compare( 0, file.size(), file ) ||
2492  !msg.compare( 0, roots.size(), roots ) ||
2493  !msg.compare( 0, xroots.size(), xroots ) )
2494  {
2495  FileStateHandler::OnStateRedirection( self, msg, message, userHandler, sendParams );
2496  return;
2497  }
2498  }
2499 
2500  //--------------------------------------------------------------------------
2501  // Handle error
2502  //--------------------------------------------------------------------------
2503  Log *log = DefaultEnv::GetLog();
2504  XrdSysMutexHelper scopedLock( self->pMutex );
2505  self->pInTheFly.erase( message );
2506 
2507  log->Dump( FileMsg, "[0x%x@%s] File state error encountered. Message %s "
2508  "returned with %s", self.get(), self->pFileUrl->GetURL().c_str(),
2509  message->GetDescription().c_str(), status->ToStr().c_str() );
2510 
2511  //--------------------------------------------------------------------------
2512  // Report to monitoring
2513  //--------------------------------------------------------------------------
2515  if( mon )
2516  {
2518  i.file = self->pFileUrl;
2519  i.status = status;
2520 
2521  ClientRequest *req = (ClientRequest*)message->GetBuffer();
2522  switch( req->header.requestid )
2523  {
2524  case kXR_read: i.opCode = Monitor::ErrorInfo::ErrRead; break;
2530  default: i.opCode = Monitor::ErrorInfo::ErrUnc;
2531  }
2532 
2533  mon->Event( Monitor::EvErrIO, &i );
2534  }
2535 
2536  //--------------------------------------------------------------------------
2537  // The message is not recoverable
2538  // (message using a kernel buffer is not recoverable by definition)
2539  //--------------------------------------------------------------------------
2540  if( !self->IsRecoverable( *status ) || sendParams.kbuff )
2541  {
2542  log->Error( FileMsg, "[0x%x@%s] Fatal file state error. Message %s "
2543  "returned with %s", self.get(), self->pFileUrl->GetURL().c_str(),
2544  message->GetDescription().c_str(), status->ToStr().c_str() );
2545 
2546  self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2547  delete status;
2548  return;
2549  }
2550 
2551  //--------------------------------------------------------------------------
2552  // Insert the message to the recovery queue and start the recovery
2553  // procedure if we don't have any more message in the fly
2554  //--------------------------------------------------------------------------
2555  self->pCloseReason = *status;
2556  RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2557  delete status;
2558  }
2559 
2560  //----------------------------------------------------------------------------
2561  // Handle stateful redirect
2562  //----------------------------------------------------------------------------
2563  void FileStateHandler::OnStateRedirection( std::shared_ptr<FileStateHandler> &self,
2564  const std::string &redirectUrl,
2565  Message *message,
2566  ResponseHandler *userHandler,
2567  MessageSendParams &sendParams )
2568  {
2569  XrdSysMutexHelper scopedLock( self->pMutex );
2570  self->pInTheFly.erase( message );
2571 
2572  //--------------------------------------------------------------------------
2573  // Register the state redirect url and append the new cgi information to
2574  // the file URL
2575  //--------------------------------------------------------------------------
2576  if( !self->pStateRedirect )
2577  {
2578  std::ostringstream o;
2579  self->pStateRedirect = new URL( redirectUrl );
2580  URL::ParamsMap params = self->pFileUrl->GetParams();
2581  MessageUtils::MergeCGI( params,
2582  self->pStateRedirect->GetParams(),
2583  false );
2584  self->pFileUrl->SetParams( params );
2585  }
2586 
2587  RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2588  }
2589 
2590  //----------------------------------------------------------------------------
2591  // Handle stateful response
2592  //----------------------------------------------------------------------------
2593  void FileStateHandler::OnStateResponse( std::shared_ptr<FileStateHandler> &self,
2594  XRootDStatus *status,
2595  Message *message,
2596  AnyObject *response,
2597  HostList */*urlList*/ )
2598  {
2599  Log *log = DefaultEnv::GetLog();
2600  XrdSysMutexHelper scopedLock( self->pMutex );
2601 
2602  log->Dump( FileMsg, "[0x%x@%s] Got state response for message %s",
2603  self.get(), self->pFileUrl->GetURL().c_str(),
2604  message->GetDescription().c_str() );
2605 
2606  //--------------------------------------------------------------------------
2607  // Since this message may be the last "in-the-fly" and no recovery
2608  // is done if messages are in the fly, we may need to trigger recovery
2609  //--------------------------------------------------------------------------
2610  self->pInTheFly.erase( message );
2611  RunRecovery( self );
2612 
2613  //--------------------------------------------------------------------------
2614  // Play with the actual response before returning it. This is a good
2615  // place to do caching in the future.
2616  //--------------------------------------------------------------------------
2617  ClientRequest *req = (ClientRequest*)message->GetBuffer();
2618  switch( req->header.requestid )
2619  {
2620  //------------------------------------------------------------------------
2621  // Cache the stat response
2622  //------------------------------------------------------------------------
2623  case kXR_stat:
2624  {
2625  StatInfo *info = 0;
2626  response->Get( info );
2627  delete self->pStatInfo;
2628  self->pStatInfo = new StatInfo( *info );
2629  break;
2630  }
2631 
2632  //------------------------------------------------------------------------
2633  // Handle read response
2634  //------------------------------------------------------------------------
2635  case kXR_read:
2636  {
2637  ++self->pRCount;
2638  self->pRBytes += req->read.rlen;
2639  break;
2640  }
2641 
2642  //------------------------------------------------------------------------
2643  // Handle read response
2644  //------------------------------------------------------------------------
2645  case kXR_pgread:
2646  {
2647  ++self->pRCount;
2648  self->pRBytes += req->pgread.rlen;
2649  break;
2650  }
2651 
2652  //------------------------------------------------------------------------
2653  // Handle readv response
2654  //------------------------------------------------------------------------
2655  case kXR_readv:
2656  {
2657  ++self->pVRCount;
2658  size_t segs = req->header.dlen/sizeof(readahead_list);
2659  readahead_list *dataChunk = (readahead_list*)message->GetBuffer( 24 );
2660  for( size_t i = 0; i < segs; ++i )
2661  self->pVRBytes += dataChunk[i].rlen;
2662  self->pVSegs += segs;
2663  break;
2664  }
2665 
2666  //------------------------------------------------------------------------
2667  // Handle write response
2668  //------------------------------------------------------------------------
2669  case kXR_write:
2670  {
2671  ++self->pWCount;
2672  self->pWBytes += req->write.dlen;
2673  break;
2674  }
2675 
2676  //------------------------------------------------------------------------
2677  // Handle write response
2678  //------------------------------------------------------------------------
2679  case kXR_pgwrite:
2680  {
2681  ++self->pWCount;
2682  self->pWBytes += req->pgwrite.dlen;
2683  break;
2684  }
2685 
2686  //------------------------------------------------------------------------
2687  // Handle writev response
2688  //------------------------------------------------------------------------
2689  case kXR_writev:
2690  {
2691  ++self->pVWCount;
2692  size_t size = req->header.dlen/sizeof(readahead_list);
2693  XrdProto::write_list *wrtList =
2694  reinterpret_cast<XrdProto::write_list*>( message->GetBuffer( 24 ) );
2695  for( size_t i = 0; i < size; ++i )
2696  self->pVWBytes += wrtList[i].wlen;
2697  break;
2698  }
2699  };
2700  }
2701 
2702  //------------------------------------------------------------------------
2704  //------------------------------------------------------------------------
2705  void FileStateHandler::Tick( time_t now )
2706  {
2707  if (pMutex.CondLock())
2708  {TimeOutRequests( now );
2709  pMutex.UnLock();
2710  }
2711  }
2712 
2713  //----------------------------------------------------------------------------
2714  // Declare timeout on requests being recovered
2715  //----------------------------------------------------------------------------
2717  {
2718  if( !pToBeRecovered.empty() )
2719  {
2720  Log *log = DefaultEnv::GetLog();
2721  log->Dump( FileMsg, "[0x%x@%s] Got a timer event", this,
2722  pFileUrl->GetURL().c_str() );
2723  RequestList::iterator it;
2725  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2726  {
2727  if( it->params.expires <= now )
2728  {
2729  jobMan->QueueJob( new ResponseJob(
2730  it->handler,
2732  0, it->params.hostList ) );
2733  it = pToBeRecovered.erase( it );
2734  }
2735  else
2736  ++it;
2737  }
2738  }
2739  }
2740 
2741  //----------------------------------------------------------------------------
2742  // Called in the child process after the fork
2743  //----------------------------------------------------------------------------
2745  {
2746  Log *log = DefaultEnv::GetLog();
2747 
2748  if( pFileState == Closed || pFileState == Error )
2749  return;
2750 
2751  if( (IsReadOnly() && pDoRecoverRead) ||
2752  (!IsReadOnly() && pDoRecoverWrite) )
2753  {
2754  log->Debug( FileMsg, "[0x%x@%s] Putting the file in recovery state in "
2755  "process %d", this, pFileUrl->GetURL().c_str(), getpid() );
2756  pFileState = Recovering;
2757  pInTheFly.clear();
2758  pToBeRecovered.clear();
2759  }
2760  else
2761  pFileState = Error;
2762  }
2763 
2764  //------------------------------------------------------------------------
2765  // Try other data server
2766  //------------------------------------------------------------------------
2767  XRootDStatus FileStateHandler::TryOtherServer( std::shared_ptr<FileStateHandler> &self, uint16_t timeout )
2768  {
2769  XrdSysMutexHelper scopedLock( self->pMutex );
2770 
2771  if( self->pFileState != Opened || !self->pLoadBalancer )
2772  return XRootDStatus( stError, errInvalidOp );
2773 
2774  self->pFileState = Recovering;
2775 
2776  Log *log = DefaultEnv::GetLog();
2777  log->Debug( FileMsg, "[0x%x@%s] Reopen file at next data server.",
2778  self.get(), self->pFileUrl->GetURL().c_str() );
2779 
2780  // merge CGI
2781  auto lbcgi = self->pLoadBalancer->GetParams();
2782  auto dtcgi = self->pDataServer->GetParams();
2783  MessageUtils::MergeCGI( lbcgi, dtcgi, false );
2784  // update tried CGI
2785  auto itr = lbcgi.find( "tried" );
2786  if( itr == lbcgi.end() )
2787  lbcgi["tried"] = self->pDataServer->GetHostName();
2788  else
2789  {
2790  std::string tried = itr->second;
2791  tried += "," + self->pDataServer->GetHostName();
2792  lbcgi["tried"] = tried;
2793  }
2794  self->pLoadBalancer->SetParams( lbcgi );
2795 
2796  return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2797  }
2798 
2799  //------------------------------------------------------------------------
2800  // Generic implementation of xattr operation
2801  //------------------------------------------------------------------------
2802  template<typename T>
2803  Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2804  kXR_char subcode,
2805  kXR_char options,
2806  const std::vector<T> &attrs,
2807  ResponseHandler *handler,
2808  uint16_t timeout )
2809  {
2810  //--------------------------------------------------------------------------
2811  // Issue a new fattr request
2812  //--------------------------------------------------------------------------
2813  Message *msg;
2814  ClientFattrRequest *req;
2815  MessageUtils::CreateRequest( msg, req );
2816 
2817  req->requestid = kXR_fattr;
2818  req->subcode = subcode;
2819  req->numattr = attrs.size();
2820  req->options = options;
2821  memcpy( req->fhandle, self->pFileHandle, 4 );
2822  XRootDStatus st = MessageUtils::CreateXAttrBody( msg, attrs );
2823  if( !st.IsOK() ) return st;
2824 
2825  MessageSendParams params;
2826  params.timeout = timeout;
2827  params.followRedirects = false;
2828  params.stateful = true;
2830 
2832  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2833 
2834  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2835  }
2836 
2837  //----------------------------------------------------------------------------
2838  // Send a message to a host or put it in the recovery queue
2839  //----------------------------------------------------------------------------
2840  Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2841  const URL &url,
2842  Message *msg,
2843  ResponseHandler *handler,
2844  MessageSendParams &sendParams )
2845  {
2846  //--------------------------------------------------------------------------
2847  // Recovering
2848  //--------------------------------------------------------------------------
2849  if( self->pFileState == Recovering )
2850  {
2851  return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2852  }
2853 
2854  //--------------------------------------------------------------------------
2855  // Trying to send
2856  //--------------------------------------------------------------------------
2857  if( self->pFileState == Opened )
2858  {
2859  msg->SetSessionId( self->pSessionId );
2860  XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2861 
2862  //------------------------------------------------------------------------
2863  // Invalid session id means that the connection has been broken while we
2864  // were idle so we haven't been informed about this fact earlier.
2865  //------------------------------------------------------------------------
2866  if( !st.IsOK() && st.code == errInvalidSession && self->IsRecoverable( st ) )
2867  return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2868 
2869  if( st.IsOK() )
2870  self->pInTheFly.insert(msg);
2871  else
2872  delete handler;
2873  return st;
2874  }
2875  return Status( stError, errInvalidOp );
2876  }
2877 
2878  //----------------------------------------------------------------------------
2879  // Check if the stateful error is recoverable
2880  //----------------------------------------------------------------------------
2881  bool FileStateHandler::IsRecoverable( const XRootDStatus &status ) const
2882  {
2883  const auto recoverable_errors = {
2887  errInternal,
2888  errTlsError,
2890  };
2891 
2892  if (pDoRecoverRead || pDoRecoverWrite)
2893  for (const auto error : recoverable_errors)
2894  if (status.code == error)
2895  return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
2896 
2897  return false;
2898  }
2899 
2900  //----------------------------------------------------------------------------
2901  // Check if the file is open for read only
2902  //----------------------------------------------------------------------------
2903  bool FileStateHandler::IsReadOnly() const
2904  {
2905  if( (pOpenFlags & kXR_open_read) && !(pOpenFlags & kXR_open_updt) &&
2906  !(pOpenFlags & kXR_open_apnd ) )
2907  return true;
2908  return false;
2909  }
2910 
2911  //----------------------------------------------------------------------------
2912  // Recover a message
2913  //----------------------------------------------------------------------------
2914  Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
2915  RequestData rd,
2916  bool callbackOnFailure )
2917  {
2918  self->pFileState = Recovering;
2919 
2920  Log *log = DefaultEnv::GetLog();
2921  log->Dump( FileMsg, "[0x%x@%s] Putting message %s in the recovery list",
2922  self.get(), self->pFileUrl->GetURL().c_str(),
2923  rd.request->GetDescription().c_str() );
2924 
2925  Status st = RunRecovery( self );
2926  if( st.IsOK() )
2927  {
2928  self->pToBeRecovered.push_back( rd );
2929  return st;
2930  }
2931 
2932  if( callbackOnFailure )
2933  self->FailMessage( rd, st );
2934 
2935  return st;
2936  }
2937 
2938  //----------------------------------------------------------------------------
2939  // Run the recovery procedure if appropriate
2940  //----------------------------------------------------------------------------
2941  Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
2942  {
2943  if( self->pFileState != Recovering )
2944  return Status();
2945 
2946  if( !self->pInTheFly.empty() )
2947  return Status();
2948 
2949  Log *log = DefaultEnv::GetLog();
2950  log->Debug( FileMsg, "[0x%x@%s] Running the recovery procedure", self.get(),
2951  self->pFileUrl->GetURL().c_str() );
2952 
2953  Status st;
2954  if( self->pStateRedirect )
2955  {
2956  SendClose( self, 0 );
2957  st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
2958  delete self->pStateRedirect; self->pStateRedirect = 0;
2959  }
2960  else if( self->IsReadOnly() && self->pLoadBalancer )
2961  st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
2962  else
2963  st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
2964 
2965  if( !st.IsOK() )
2966  {
2967  self->pFileState = Error;
2968  self->pStatus = st;
2969  self->FailQueuedMessages( st );
2970  }
2971 
2972  return st;
2973  }
2974 
2975  //----------------------------------------------------------------------------
2976  // Send a close and ignore the response
2977  //----------------------------------------------------------------------------
2978  XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
2979  uint16_t timeout )
2980  {
2981  Message *msg;
2982  ClientCloseRequest *req;
2983  MessageUtils::CreateRequest( msg, req );
2984 
2985  req->requestid = kXR_close;
2986  memcpy( req->fhandle, self->pFileHandle, 4 );
2987 
2989  msg->SetSessionId( self->pSessionId );
2991  [self]( XRootDStatus&, AnyObject& ) mutable { self.reset(); } );
2992  MessageSendParams params;
2993  params.timeout = timeout;
2994  params.followRedirects = false;
2995  params.stateful = true;
2996 
2998 
2999  return self->IssueRequest( *self->pDataServer, msg, handler, params );
3000  }
3001 
3002  //----------------------------------------------------------------------------
3003  // Re-open the current file at a given server
3004  //----------------------------------------------------------------------------
3005  XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3006  const URL &url,
3007  uint16_t timeout )
3008  {
3009  Log *log = DefaultEnv::GetLog();
3010  log->Dump( FileMsg, "[0x%x@%s] Sending a recovery open command to %s",
3011  self.get(), self->pFileUrl->GetURL().c_str(), url.GetURL().c_str() );
3012 
3013  //--------------------------------------------------------------------------
3014  // Remove the kXR_delete and kXR_new flags, as we don't want the recovery
3015  // procedure to delete a file that has been partially updated or fail it
3016  // because a partially uploaded file already exists.
3017  //--------------------------------------------------------------------------
3018  if( self->pOpenFlags & kXR_delete)
3019  {
3020  self->pOpenFlags &= ~kXR_delete;
3021  self->pOpenFlags |= kXR_open_updt;
3022  }
3023 
3024  self->pOpenFlags &= ~kXR_new;
3025 
3026  Message *msg;
3027  ClientOpenRequest *req;
3028  URL u = url;
3029 
3030  if( url.GetPath().empty() )
3031  u.SetPath( self->pFileUrl->GetPath() );
3032 
3033  std::string path = u.GetPathWithFilteredParams();
3034  MessageUtils::CreateRequest( msg, req, path.length() );
3035 
3036  req->requestid = kXR_open;
3037  req->mode = self->pOpenMode;
3038  req->options = self->pOpenFlags;
3039  req->dlen = path.length();
3040  msg->Append( path.c_str(), path.length(), 24 );
3041 
3042  // create a new reopen handler
3043  // (it is not assigned to 'pReOpenHandler' in order not to bump the reference counter
3044  // until we know that 'SendMessage' was successful)
3045  OpenHandler *openHandler = new OpenHandler( self, 0 );
3046  MessageSendParams params; params.timeout = timeout;
3049 
3050  //--------------------------------------------------------------------------
3051  // Issue the open request
3052  //--------------------------------------------------------------------------
3053  XRootDStatus st = self->IssueRequest( url, msg, openHandler, params );
3054 
3055  // if there was a problem destroy the open handler
3056  if( !st.IsOK() )
3057  {
3058  delete openHandler;
3059  self->pStatus = st;
3060  self->pFileState = Closed;
3061  }
3062  return st;
3063  }
3064 
3065  //------------------------------------------------------------------------
3066  // Fail a message
3067  //------------------------------------------------------------------------
3068  void FileStateHandler::FailMessage( RequestData rd, XRootDStatus status )
3069  {
3070  Log *log = DefaultEnv::GetLog();
3071  log->Dump( FileMsg, "[0x%x@%s] Failing message %s with %s",
3072  this, pFileUrl->GetURL().c_str(),
3073  rd.request->GetDescription().c_str(),
3074  status.ToStr().c_str() );
3075 
3076  StatefulHandler *sh = dynamic_cast<StatefulHandler*>(rd.handler);
3077  if( !sh )
3078  {
3079  Log *log = DefaultEnv::GetLog();
3080  log->Error( FileMsg, "[0x%x@%s] Internal error while recovering %s",
3081  this, pFileUrl->GetURL().c_str(),
3082  rd.request->GetDescription().c_str() );
3083  return;
3084  }
3085 
3087  ResponseHandler *userHandler = sh->GetUserHandler();
3088  jobMan->QueueJob( new ResponseJob(
3089  userHandler,
3090  new XRootDStatus( status ),
3091  0, rd.params.hostList ) );
3092 
3093  delete sh;
3094  }
3095 
3096  //----------------------------------------------------------------------------
3097  // Fail queued messages
3098  //----------------------------------------------------------------------------
3099  void FileStateHandler::FailQueuedMessages( XRootDStatus status )
3100  {
3101  RequestList::iterator it;
3102  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3103  FailMessage( *it, status );
3104  pToBeRecovered.clear();
3105  }
3106 
3107  //------------------------------------------------------------------------
3108  // Re-send queued messages
3109  //------------------------------------------------------------------------
3110  void FileStateHandler::ReSendQueuedMessages()
3111  {
3112  RequestList::iterator it;
3113  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3114  {
3115  it->request->SetSessionId( pSessionId );
3116  ReWriteFileHandle( it->request );
3117  XRootDStatus st = IssueRequest( *pDataServer, it->request,
3118  it->handler, it->params );
3119  if( !st.IsOK() )
3120  FailMessage( *it, st );
3121  }
3122  pToBeRecovered.clear();
3123  }
3124 
3125  //------------------------------------------------------------------------
3126  // Re-write file handle
3127  //------------------------------------------------------------------------
3128  void FileStateHandler::ReWriteFileHandle( Message *msg )
3129  {
3131  switch( hdr->requestid )
3132  {
3133  case kXR_read:
3134  {
3136  memcpy( req->fhandle, pFileHandle, 4 );
3137  break;
3138  }
3139  case kXR_write:
3140  {
3142  memcpy( req->fhandle, pFileHandle, 4 );
3143  break;
3144  }
3145  case kXR_sync:
3146  {
3148  memcpy( req->fhandle, pFileHandle, 4 );
3149  break;
3150  }
3151  case kXR_truncate:
3152  {
3154  memcpy( req->fhandle, pFileHandle, 4 );
3155  break;
3156  }
3157  case kXR_readv:
3158  {
3160  readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
3161  for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3162  memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3163  break;
3164  }
3165  case kXR_writev:
3166  {
3167  ClientWriteVRequest *req =
3168  reinterpret_cast<ClientWriteVRequest*>( msg->GetBuffer() );
3169  XrdProto::write_list *wrtList =
3170  reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
3171  size_t size = req->dlen / sizeof(XrdProto::write_list);
3172  for( size_t i = 0; i < size; ++i )
3173  memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3174  break;
3175  }
3176  case kXR_pgread:
3177  {
3179  memcpy( req->fhandle, pFileHandle, 4 );
3180  break;
3181  }
3182  case kXR_pgwrite:
3183  {
3185  memcpy( req->fhandle, pFileHandle, 4 );
3186  break;
3187  }
3188  }
3189 
3190  Log *log = DefaultEnv::GetLog();
3191  log->Dump( FileMsg, "[0x%x@%s] Rewritten file handle for %s to 0x%x",
3192  this, pFileUrl->GetURL().c_str(), msg->GetDescription().c_str(),
3193  *((uint32_t*)pFileHandle) );
3195  }
3196 
3197  //----------------------------------------------------------------------------
3198  // Dispatch monitoring information on close
3199  //----------------------------------------------------------------------------
3200  void FileStateHandler::MonitorClose( const XRootDStatus *status )
3201  {
3203  if( mon )
3204  {
3206  i.file = pFileUrl;
3207  i.oTOD = pOpenTime;
3208  gettimeofday( &i.cTOD, 0 );
3209  i.rBytes = pRBytes;
3210  i.vrBytes = pVRBytes;
3211  i.wBytes = pWBytes;
3212  i.vwBytes = pVWBytes;
3213  i.vSegs = pVSegs;
3214  i.rCount = pRCount;
3215  i.vCount = pVRCount;
3216  i.wCount = pWCount;
3217  i.status = status;
3218  mon->Event( Monitor::EvClose, &i );
3219  }
3220  }
3221 
3222  XRootDStatus FileStateHandler::IssueRequest( const URL &url,
3223  Message *msg,
3224  ResponseHandler *handler,
3225  MessageSendParams &sendParams )
3226  {
3227  // first handle Metalinks
3228  if( pUseVirtRedirector && url.IsMetalink() )
3229  return MessageUtils::RedirectMessage( url, msg, handler,
3230  sendParams, pLFileHandler );
3231 
3232  // than local file access
3233  if( url.IsLocalFile() )
3234  return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3235 
3236  // and finally ordinary XRootD requests
3237  return MessageUtils::SendMessage( url, msg, handler,
3238  sendParams, pLFileHandler );
3239  }
3240 
3241  //------------------------------------------------------------------------
3242  // Send a write request with payload being stored in a kernel buffer
3243  //------------------------------------------------------------------------
3244  XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3245  uint64_t offset,
3246  uint32_t length,
3247  std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3248  ResponseHandler *handler,
3249  uint16_t timeout )
3250  {
3251  //--------------------------------------------------------------------------
3252  // Create the write request
3253  //--------------------------------------------------------------------------
3254  XrdSysMutexHelper scopedLock( self->pMutex );
3255 
3256  if( self->pFileState != Opened && self->pFileState != Recovering )
3257  return XRootDStatus( stError, errInvalidOp );
3258 
3259  Log *log = DefaultEnv::GetLog();
3260  log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
3261  "%s", self.get(), self->pFileUrl->GetURL().c_str(),
3262  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3263 
3264  Message *msg;
3265  ClientWriteRequest *req;
3266  MessageUtils::CreateRequest( msg, req );
3267 
3268  req->requestid = kXR_write;
3269  req->offset = offset;
3270  req->dlen = length;
3271  memcpy( req->fhandle, self->pFileHandle, 4 );
3272 
3273  MessageSendParams params;
3274  params.timeout = timeout;
3275  params.followRedirects = false;
3276  params.stateful = true;
3277  params.kbuff = kbuff.release();
3278  params.chunkList = new ChunkList();
3279 
3281 
3283  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3284 
3285  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3286  }
3287 }
kXR_unt16 requestid
Definition: XProtocol.hh:479
kXR_unt16 requestid
Definition: XProtocol.hh:630
kXR_unt16 requestid
Definition: XProtocol.hh:804
@ kXR_fattrDel
Definition: XProtocol.hh:270
@ kXR_fattrSet
Definition: XProtocol.hh:273
@ kXR_fattrList
Definition: XProtocol.hh:272
@ kXR_fattrGet
Definition: XProtocol.hh:271
#define kXR_suppgrw
Definition: XProtocol.hh:1171
kXR_char fhandle[4]
Definition: XProtocol.hh:531
kXR_char fhandle[4]
Definition: XProtocol.hh:780
struct ClientPgReadRequest pgread
Definition: XProtocol.hh:859
kXR_char fhandle[4]
Definition: XProtocol.hh:805
kXR_char fhandle[4]
Definition: XProtocol.hh:769
kXR_int64 offset
Definition: XProtocol.hh:646
kXR_unt16 requestid
Definition: XProtocol.hh:644
@ kXR_virtReadv
Definition: XProtocol.hh:150
kXR_unt16 options
Definition: XProtocol.hh:481
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:860
kXR_unt16 requestid
Definition: XProtocol.hh:228
@ kXR_async
Definition: XProtocol.hh:458
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_open_read
Definition: XProtocol.hh:456
@ kXR_open_updt
Definition: XProtocol.hh:457
@ kXR_new
Definition: XProtocol.hh:455
@ kXR_open_apnd
Definition: XProtocol.hh:462
@ kXR_retstat
Definition: XProtocol.hh:463
struct ClientRequestHdr header
Definition: XProtocol.hh:844
kXR_char fhandle[4]
Definition: XProtocol.hh:509
#define kXR_recoverWrts
Definition: XProtocol.hh:1163
kXR_unt16 infotype
Definition: XProtocol.hh:631
kXR_char fhandle[4]
Definition: XProtocol.hh:645
kXR_char fhandle[4]
Definition: XProtocol.hh:229
kXR_unt16 requestid
Definition: XProtocol.hh:157
kXR_char fhandle[4]
Definition: XProtocol.hh:633
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_fattr
Definition: XProtocol.hh:132
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_pgwrite
Definition: XProtocol.hh:138
struct ClientReadRequest read
Definition: XProtocol.hh:865
kXR_int32 rlen
Definition: XProtocol.hh:660
kXR_unt16 requestid
Definition: XProtocol.hh:766
kXR_int32 dlen
Definition: XProtocol.hh:483
kXR_char fhandle[4]
Definition: XProtocol.hh:792
kXR_unt16 mode
Definition: XProtocol.hh:480
kXR_unt16 requestid
Definition: XProtocol.hh:508
kXR_unt16 requestid
Definition: XProtocol.hh:779
kXR_char fhandle[4]
Definition: XProtocol.hh:204
kXR_int64 offset
Definition: XProtocol.hh:661
#define kXR_PROTPGRWVERSION
Definition: XProtocol.hh:73
kXR_int64 offset
Definition: XProtocol.hh:806
struct ClientWriteRequest write
Definition: XProtocol.hh:874
kXR_int32 rlen
Definition: XProtocol.hh:647
kXR_unt16 requestid
Definition: XProtocol.hh:670
@ kXR_Qopaqug
Definition: XProtocol.hh:625
@ kXR_Qvisa
Definition: XProtocol.hh:622
kXR_int32 dlen
Definition: XProtocol.hh:159
unsigned char kXR_char
Definition: XPtypes.hh:65
static int mapError(int rc)
Definition: XProtocol.hh:1358
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
Definition: XrdClBuffer.hh:34
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
Definition: XrdClBuffer.hh:164
uint32_t GetSize() const
Get the size of the message.
Definition: XrdClBuffer.hh:132
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
An interface for file plug-ins.
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, uint16_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
FileStateHandler(FilePlugIn *&plugin)
Constructor.
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, uint16_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Closed
The file is closed.
@ Opened
Opening has succeeded.
@ Error
Opening has failed.
@ Recovering
Recovering from an error.
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool IsOpen() const
Check if the file is open.
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, uint16_t timeout)
Try other data server.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:30
const std::string & GetDescription() const
Get the description of the message.
Definition: XrdClMessage.hh:95
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Call the user callback.
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:94
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:451
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:388
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition: XrdClURL.cc:317
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
void SetPath(const std::string &path)
Set the path.
Definition: XrdClURL.hh:220
bool IsLocalFile() const
Definition: XrdClURL.cc:460
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:239
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:212
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
Definition: XrdClUtils.hh:235
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
Definition: XrdClStatus.hh:76
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t errPollerError
Definition: XrdClStatus.hh:75
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInProgress
Definition: XrdClStatus.hh:59
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const uint64_t FileMsg
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
Definition: XrdClStatus.hh:78
const uint16_t errSocketError
Definition: XrdClStatus.hh:72
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errSocketDisconnected
Definition: XrdClStatus.hh:74
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
static const int aData
Definition: XProtocol.hh:298
kXR_char fhandle[4]
Definition: XProtocol.hh:288
kXR_unt16 requestid
Definition: XProtocol.hh:287
kXR_unt16 requestid
Definition: XProtocol.hh:818
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
Describe a file close event.
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted