xrootd
XrdClEcHandler.hh
Go to the documentation of this file.
1 /*
2  * XrdClEcHandler.hh
3  *
4  * Created on: 23 Mar 2021
5  * Author: simonm
6  */
7 
8 #ifndef SRC_XRDCL_XRDCLECHANDLER_HH_
9 #define SRC_XRDCL_XRDCLECHANDLER_HH_
10 
12 #include "XrdCl/XrdClUtils.hh"
15 
16 #include "XrdEc/XrdEcReader.hh"
17 #include "XrdEc/XrdEcStrmWriter.hh"
18 
19 #include <memory>
20 
21 namespace XrdCl
22 {
23  class EcHandler : public FilePlugIn
24  {
25  public:
26  EcHandler( const URL &redir,
28  std::unique_ptr<CheckSumHelper> cksHelper ) : redir( redir ),
29  fs( redir, false ),
30  objcfg( objcfg ),
31  curroff( 0 ),
32  cksHelper( std::move( cksHelper ) )
33  {
35  }
36 
37  virtual ~EcHandler()
38  {
39  }
40 
41  XRootDStatus Open( uint16_t flags,
42  ResponseHandler *handler,
43  uint16_t timeout )
44  {
45  if( ( flags & OpenFlags::Write ) || ( flags & OpenFlags::Update ) )
46  {
47  if( !( flags & OpenFlags::New ) || // it has to be a new file
48  ( flags & OpenFlags::Delete ) || // truncation is not supported
49  ( flags & OpenFlags::Read ) ) // write + read is not supported
51 
52  if( objcfg->plgr.empty() )
53  {
55  if( !st.IsOK() ) return st;
56  }
57  writer.reset( new XrdEc::StrmWriter( *objcfg ) );
58  writer->Open( handler, timeout );
59  return XRootDStatus();
60  }
61 
62  if( flags & OpenFlags::Read )
63  {
64  if( flags & OpenFlags::Write )
66 
67  if( objcfg->plgr.empty() )
68  {
70  if( !st.IsOK() ) return st;
71  }
72  reader.reset( new XrdEc::Reader( *objcfg ) );
73  reader->Open( handler, timeout );
74  return XRootDStatus();
75  }
76 
78  }
79 
80  XRootDStatus Open( const std::string &url,
81  OpenFlags::Flags flags,
82  Access::Mode mode,
83  ResponseHandler *handler,
84  uint16_t timeout )
85  {
86  (void)url; (void)mode;
87  return Open( flags, handler, timeout );
88  }
89 
90 
91  //------------------------------------------------------------------------
93  //------------------------------------------------------------------------
95  uint16_t timeout )
96  {
97  if( writer )
98  {
99  writer->Close( ResponseHandler::Wrap( [this, handler]( XRootDStatus *st, AnyObject *rsp )
100  {
101  writer.reset();
102  if( st->IsOK() && bool( cksHelper ) )
103  {
104  std::string commit = redir.GetPath()
105  + "?xrdec.objid=" + objcfg->obj
106  + "&xrdec.close=true&xrdec.size=" + std::to_string( curroff );
107  if( cksHelper )
108  {
109  std::string ckstype = cksHelper->GetType();
110  std::string cksval;
111  auto st = cksHelper->GetCheckSum( cksval, ckstype );
112  if( !st.IsOK() )
113  {
114  handler->HandleResponse( new XRootDStatus( st ), nullptr );
115  return;
116  }
117  commit += "&xrdec.cksum=" + cksval;
118  }
119  Buffer arg; arg.FromString( commit );
120  auto st = fs.Query( QueryCode::OpaqueFile, arg, handler );
121  if( !st.IsOK() ) handler->HandleResponse( new XRootDStatus( st ), nullptr );
122  return;
123  }
124  handler->HandleResponse( st, rsp );
125  } ), timeout );
126  return XRootDStatus();
127  }
128 
129  if( reader )
130  {
131  reader->Close( ResponseHandler::Wrap( [this, handler]( XRootDStatus *st, AnyObject *rsp )
132  {
133  reader.reset();
134  handler->HandleResponse( st, rsp );
135  } ), timeout );
136  return XRootDStatus();
137  }
138 
140  }
141 
142  //------------------------------------------------------------------------
144  //------------------------------------------------------------------------
145  XRootDStatus Stat( bool force,
146  ResponseHandler *handler,
147  uint16_t timeout )
148  {
149 
150  if( !objcfg->nomtfile )
151  return fs.Stat( redir.GetPath(), handler, timeout );
152 
153  if( !force && statcache )
154  {
155  auto rsp = StatRsp( statcache->GetSize() );
156  Schedule( handler, rsp );
157  return XRootDStatus();
158  }
159 
160  if( writer )
161  {
162  statcache.reset( new StatInfo() );
163  statcache->SetSize( writer->GetSize() );
164  auto rsp = StatRsp( statcache->GetSize() );
165  Schedule( handler, rsp );
166  return XRootDStatus();
167  }
168 
169  if( reader )
170  {
171  statcache.reset( new StatInfo() );
172  statcache->SetSize( reader->GetSize() );
173  auto rsp = StatRsp( statcache->GetSize() );
174  Schedule( handler, rsp );
175  return XRootDStatus();
176  }
177 
178  return XRootDStatus( stError, errInvalidOp, 0, "File not open." );
179  }
180 
181  //------------------------------------------------------------------------
183  //------------------------------------------------------------------------
184  XRootDStatus Read( uint64_t offset,
185  uint32_t size,
186  void *buffer,
187  ResponseHandler *handler,
188  uint16_t timeout )
189  {
190  if( !reader ) return XRootDStatus( stError, errInternal );
191 
192  reader->Read( offset, size, buffer, handler, timeout );
193  return XRootDStatus();
194  }
195 
196  //------------------------------------------------------------------------
198  //------------------------------------------------------------------------
199  XRootDStatus Write( uint64_t offset,
200  uint32_t size,
201  const void *buffer,
202  ResponseHandler *handler,
203  uint16_t timeout )
204  {
205  if( cksHelper )
206  cksHelper->Update( buffer, size );
207 
208  if( !writer ) return XRootDStatus( stError, errInternal );
209  if( offset != curroff ) return XRootDStatus( stError, errNotSupported );
210  writer->Write( size, buffer, handler );
211  curroff += size;
212  return XRootDStatus();
213  }
214 
215  //------------------------------------------------------------------------
217  //------------------------------------------------------------------------
218  bool IsOpen() const
219  {
220  return writer || reader;
221  }
222 
223  private:
224 
226  {
227  LocationInfo *info = nullptr;
228  XRootDStatus st = fs.DeepLocate( "*", OpenFlags::None, info );
229  std::unique_ptr<LocationInfo> ptr( info );
230  if( !st.IsOK() ) return st;
231  if( info->GetSize() < objcfg->nbchunks )
232  return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
233  unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
234  shuffle (info->Begin(), info->End(), std::default_random_engine(seed));
235  for( size_t i = 0; i < objcfg->nbchunks; ++i )
236  {
237  auto &location = info->At( i );
238  objcfg->plgr.emplace_back( "root://" + location.GetAddress() + '/' );
239  }
240  return XRootDStatus();
241  }
242 
243  inline XRootDStatus LoadPlacement( const std::string &path )
244  {
245  LocationInfo *info = nullptr;
246  XRootDStatus st = fs.DeepLocate( path, OpenFlags::None, info );
247  std::unique_ptr<LocationInfo> ptr( info );
248  if( !st.IsOK() ) return st;
249  if( info->GetSize() < objcfg->nbdata )
250  return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
251  for( size_t i = 0; i < info->GetSize(); ++i )
252  {
253  auto &location = info->At( i );
254  objcfg->plgr.emplace_back( "root://" + location.GetAddress() + '/' );
255  }
256  return XRootDStatus();
257  }
258 
259  inline static AnyObject* StatRsp( uint64_t size )
260  {
261  StatInfo *info = new StatInfo();
262  info->SetSize( size );
263  AnyObject *rsp = new AnyObject();
264  rsp->Set( info );
265  return rsp;
266  }
267 
268  inline static void Schedule( ResponseHandler *handler, AnyObject *rsp )
269  {
270  ResponseJob *job = new ResponseJob( handler, new XRootDStatus(), rsp, nullptr );
272  }
273 
276  std::unique_ptr<XrdEc::ObjCfg> objcfg;
277  std::unique_ptr<XrdEc::StrmWriter> writer;
278  std::unique_ptr<XrdEc::Reader> reader;
279  uint64_t curroff;
280  std::unique_ptr<CheckSumHelper> cksHelper;
281  std::unique_ptr<StatInfo> statcache;
282 
283  };
284 
285  //----------------------------------------------------------------------------
287  //----------------------------------------------------------------------------
289  {
290  public:
291  //------------------------------------------------------------------------
293  //------------------------------------------------------------------------
294  EcPlugInFactory( uint8_t nbdta, uint8_t nbprt, uint64_t chsz,
295  std::vector<std::string> && plgr ) :
296  nbdta( nbdta ), nbprt( nbprt ), chsz( chsz ), plgr( std::move( plgr ) )
297  {
298  }
299 
300  //------------------------------------------------------------------------
302  //------------------------------------------------------------------------
304  {
305  }
306 
307  //------------------------------------------------------------------------
309  //------------------------------------------------------------------------
310  virtual FilePlugIn *CreateFile( const std::string &u )
311  {
312  URL url( u );
313  XrdEc::ObjCfg *objcfg = new XrdEc::ObjCfg( url.GetPath(), nbdta, nbprt,
314  chsz, false, true );
315  objcfg->plgr = std::move( plgr );
316  return new EcHandler( url, objcfg, nullptr );
317  }
318 
319  //------------------------------------------------------------------------
321  //------------------------------------------------------------------------
322  virtual FileSystemPlugIn *CreateFileSystem( const std::string &url )
323  {
324  return nullptr;
325  }
326 
327  private:
328  uint8_t nbdta;
329  uint8_t nbprt;
330  uint64_t chsz;
331  std::vector<std::string> plgr;
332  };
333 
334  EcHandler* GetEcHandler( const URL &headnode, const URL &redirurl );
335 
336 } /* namespace XrdCl */
337 
338 #endif /* SRC_XRDCL_XRDCLECHANDLER_HH_ */
339 
Definition: XrdClAnyObject.hh:32
Implementation dependent.
Definition: XrdClFileSystem.hh:58
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
XRootDStatus Stat(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
std::vector< std::string > plgr
Definition: XrdClEcHandler.hh:331
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
Definition: XrdOucJson.hh:26358
virtual ~EcHandler()
Definition: XrdClEcHandler.hh:37
Object stat info.
Definition: XrdClXRootDResponses.hh:399
Definition: XrdClEcHandler.hh:23
std::unique_ptr< XrdEc::ObjCfg > objcfg
Definition: XrdClEcHandler.hh:276
Open only for writing.
Definition: XrdClFileSystem.hh:97
Call the user callback.
Definition: XrdClResponseJob.hh:30
Definition: XrdOucJson.hh:4516
Path location info.
Definition: XrdClXRootDResponses.hh:43
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:123
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:145
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
bool enable_plugins
Definition: XrdEcConfig.hh:77
std::unique_ptr< StatInfo > statcache
Definition: XrdClEcHandler.hh:281
Definition: XrdEcStrmWriter.hh:52
XRootDStatus LoadPlacement(const std::string &path)
Definition: XrdClEcHandler.hh:243
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Definition: XrdClJobManager.hh:92
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:94
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:80
Iterator End()
Get the location end iterator.
Definition: XrdClXRootDResponses.hh:184
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:212
uint64_t chsz
Definition: XrdClEcHandler.hh:330
uint64_t curroff
Definition: XrdClEcHandler.hh:279
EcPlugInFactory(uint8_t nbdta, uint8_t nbprt, uint64_t chsz, std::vector< std::string > &&plgr)
Constructor.
Definition: XrdClEcHandler.hh:294
An interface for file plug-ins.
Definition: XrdClPlugInInterface.hh:38
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:199
uint32_t GetSize() const
Get number of locations.
Definition: XrdClXRootDResponses.hh:152
EcHandler(const URL &redir, XrdEc::ObjCfg *objcfg, std::unique_ptr< CheckSumHelper > cksHelper)
Definition: XrdClEcHandler.hh:26
std::unique_ptr< XrdEc::Reader > reader
Definition: XrdClEcHandler.hh:278
std::unique_ptr< XrdEc::StrmWriter > writer
Definition: XrdClEcHandler.hh:277
Open for reading and writing.
Definition: XrdClFileSystem.hh:96
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual ~EcPlugInFactory()
Destructor.
Definition: XrdClEcHandler.hh:303
void Set(Type object, bool own=true)
Definition: XrdClAnyObject.hh:59
XRootDStatus LoadPlacement()
Definition: XrdClEcHandler.hh:225
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
Request status.
Definition: XrdClXRootDResponses.hh:218
Definition: XrdClAnyObject.hh:25
bool IsOpen() const
Definition: XrdClEcHandler.hh:218
uint8_t nbprt
Definition: XrdClEcHandler.hh:329
j template void())
Definition: XrdOucJson.hh:4121
Location & At(uint32_t index)
Get the location at index.
Definition: XrdClXRootDResponses.hh:160
Plugin factory.
Definition: XrdClPlugInInterface.hh:548
uint8_t nbdta
Definition: XrdClEcHandler.hh:328
static AnyObject * StatRsp(uint64_t size)
Definition: XrdClEcHandler.hh:259
virtual FileSystemPlugIn * CreateFileSystem(const std::string &url)
Create a file system plug-in for the given URL.
Definition: XrdClEcHandler.hh:322
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:41
void FromString(const std::string str)
Fill the buffer from a string.
Definition: XrdClBuffer.hh:205
Handle an async response.
Definition: XrdClXRootDResponses.hh:1116
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Definition: XrdClXRootDResponses.hh:1146
virtual FilePlugIn * CreateFile(const std::string &u)
Create a file plug-in for the given URL.
Definition: XrdClEcHandler.hh:310
XRootDStatus DeepLocate(const std::string &path, OpenFlags::Flags flags, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdEcReader.hh:57
Open only for reading.
Definition: XrdClFileSystem.hh:95
URL representation.
Definition: XrdClURL.hh:30
JobManager * GetJobManager()
Get the job manager object user by the post master.
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
Mode
Access mode.
Definition: XrdClFileSystem.hh:121
Definition: XrdClFileSystem.hh:86
URL redir
Definition: XrdClEcHandler.hh:274
Send file/filesystem queries to an XRootD cluster.
Definition: XrdClFileSystem.hh:202
Nothing.
Definition: XrdClFileSystem.hh:77
Definition: XrdClFileSystem.hh:80
std::unique_ptr< CheckSumHelper > cksHelper
Definition: XrdClEcHandler.hh:280
Plugin factory.
Definition: XrdClEcHandler.hh:288
static Config & Instance()
Singleton access.
Definition: XrdEcConfig.hh:46
Definition: XrdEcObjCfg.hh:32
std::vector< std::string > plgr
Definition: XrdEcObjCfg.hh:92
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:184
static PostMaster * GetPostMaster()
Get default post master.
FileSystem fs
Definition: XrdClEcHandler.hh:275
Flags
Open flags, may be or&#39;d when appropriate.
Definition: XrdClFileSystem.hh:75
void SetSize(uint64_t size)
Set size.
static void Schedule(ResponseHandler *handler, AnyObject *rsp)
Definition: XrdClEcHandler.hh:268
XRootDStatus Query(QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
An interface for file plug-ins.
Definition: XrdClPlugInInterface.hh:283
Iterator Begin()
Get the location begin iterator.
Definition: XrdClXRootDResponses.hh:168
Binary blob representation.
Definition: XrdClBuffer.hh:33