XRootD
XrdPfcFile.hh
Go to the documentation of this file.
1 #ifndef __XRDPFC_FILE_HH__
2 #define __XRDPFC_FILE_HH__
3 //----------------------------------------------------------------------------------
4 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
5 // Author: Alja Mrak-Tadel, Matevz Tadel
6 //----------------------------------------------------------------------------------
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //----------------------------------------------------------------------------------
20 
21 #include "XrdPfcTypes.hh"
22 #include "XrdPfcInfo.hh"
23 #include "XrdPfcStats.hh"
24 
26 
27 #include "XrdOuc/XrdOucCache.hh"
28 #include "XrdOuc/XrdOucIOVec.hh"
29 
30 #include <functional>
31 #include <map>
32 #include <set>
33 #include <string>
34 
35 class XrdJob;
36 class XrdOucIOVec;
37 
38 namespace XrdCl
39 {
40 class Log;
41 }
42 
43 namespace XrdPfc
44 {
45 class File;
46 class BlockResponseHandler;
47 class DirectResponseHandler;
48 class IO;
49 
50 struct ReadVBlockListRAM;
51 struct ReadVChunkListRAM;
52 struct ReadVBlockListDisk;
53 struct ReadVChunkListDisk;
54 
55 struct ReadReqRH : public XrdOucCacheIOCB
56 {
57  int m_expected_size = 0;
58  int m_n_chunks = 0; // Only set for ReadV().
59  unsigned short m_seq_id;
60  XrdOucCacheIOCB *m_iocb; // External callback passed into IO::Read().
61 
62  ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb) :
63  m_seq_id(sid), m_iocb(iocb)
64  {}
65 };
66 
67 // -------------------------------------------------------------
68 
70 {
71  IO *m_io;
72  ReadReqRH *m_rh; // Internal callback created in IO::Read().
73 
74  long long m_bytes_read = 0;
75  int m_error_cond = 0; // to be set to -errno
77 
78  int m_n_chunk_reqs = 0;
79  bool m_sync_done = false;
80  bool m_direct_done = true;
81 
82  ReadRequest(IO *io, ReadReqRH *rh) :
83  m_io(io), m_rh(rh)
84  {}
85 
86  void update_error_cond(int ec) { if (m_error_cond == 0 ) m_error_cond = ec; }
87 
88  bool is_complete() const { return m_n_chunk_reqs == 0 && m_sync_done && m_direct_done; }
89  int return_value() const { return m_error_cond ? m_error_cond : m_bytes_read; }
90 };
91 
92 // -------------------------------------------------------------
93 
95 {
97  char *m_buf; // Where to place the data chunk.
98  long long m_off; // Offset *within* the corresponding block.
99  int m_size; // Size of the data chunk.
100 
101  ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size) :
102  m_read_req(rreq), m_buf(buf), m_off(off), m_size(size)
103  {}
104 };
105 
106 using vChunkRequest_t = std::vector<ChunkRequest>;
107 using vChunkRequest_i = std::vector<ChunkRequest>::iterator;
108 
109 // ================================================================
110 
111 class Block
112 {
113 public:
115  IO *m_io; // IO that handled current request, used for == / != comparisons only
116  void *m_req_id; // Identity of requestor -- used for stats.
117 
118  char *m_buff;
119  long long m_offset;
120  int m_size;
122  int m_refcnt;
123  int m_errno; // stores negative errno
129 
131 
132  Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize,
133  bool m_prefetch, bool cks_net) :
134  m_file(f), m_io(io), m_req_id(rid),
135  m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize),
137  m_req_cksum_net(cks_net), m_n_cksum_errors(0)
138  {}
139 
140  char* get_buff() const { return m_buff; }
141  int get_size() const { return m_size; }
142  int get_req_size() const { return m_req_size; }
143  long long get_offset() const { return m_offset; }
144 
145  File* get_file() const { return m_file; }
146  IO* get_io() const { return m_io; }
147  void* get_req_id() const { return m_req_id; }
148 
149  bool is_finished() const { return m_downloaded || m_errno != 0; }
150  bool is_ok() const { return m_downloaded; }
151  bool is_failed() const { return m_errno != 0; }
152 
153  void set_downloaded() { m_downloaded = true; }
154  void set_error(int err) { m_errno = err; }
155  int get_error() const { return m_errno; }
156 
157  void reset_error_and_set_io(IO *io, void *rid)
158  {
159  m_errno = 0;
160  m_io = io;
161  m_req_id = rid;
162  }
163 
164  bool req_cksum_net() const { return m_req_cksum_net; }
165  bool has_cksums() const { return ! m_cksum_vec.empty(); }
169 };
170 
171 using BlockList_t = std::list<Block*>;
172 using BlockList_i = std::list<Block*>::iterator;
173 
174 // ================================================================
175 
177 {
178 public:
180 
182 
183  void Done(int result) override;
184 };
185 
186 // ----------------------------------------------------------------
187 
189 {
190 public:
195  int m_bytes_read = 0;
196  int m_errno = 0;
197 
198  DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait) :
199  m_file(file), m_read_req(rreq), m_to_wait(to_wait)
200  {}
201 
202  void Done(int result) override;
203 };
204 
205 // ================================================================
206 
207 class File
208 {
209  friend class BlockResponseHandler;
210  friend class DirectResponseHandler;
211 public:
212  // Constructor and Open() are private.
213 
215  static File* FileOpen(const std::string &path, long long offset, long long fileSize);
216 
218  ~File();
219 
222 
224  void BlocksRemovedFromWriteQ(std::list<Block*>&);
225 
227  int Read(IO *io, char* buff, long long offset, int size, ReadReqRH *rh);
228 
230  int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh);
231 
232  //----------------------------------------------------------------------
234  //----------------------------------------------------------------------
235  void ioUpdated(IO *io);
236 
237  //----------------------------------------------------------------------
240  //----------------------------------------------------------------------
241  bool ioActive(IO *io);
242 
243  //----------------------------------------------------------------------
246  //----------------------------------------------------------------------
248 
249  //----------------------------------------------------------------------
252  //----------------------------------------------------------------------
253  bool FinalizeSyncBeforeExit();
254 
255  //----------------------------------------------------------------------
257  //----------------------------------------------------------------------
258  void Sync();
259 
260  void WriteBlockToDisk(Block* b);
261 
262  void Prefetch();
263 
264  float GetPrefetchScore() const;
265 
267  const char* lPath() const;
268 
269  const std::string& GetLocalPath() const { return m_filename; }
270 
271  XrdSysError* GetLog();
273 
274  long long GetFileSize() const { return m_file_size; }
275 
276  void AddIO(IO *io);
279  void RemoveIO(IO *io);
280 
281  std::string GetRemoteLocations() const;
282  const Info::AStat* GetLastAccessStats() const { return m_cfi.GetLastAccessStats(); }
283  size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); }
284  int GetBlockSize() const { return m_cfi.GetBufferSize(); }
285  int GetNBlocks() const { return m_cfi.GetNBlocks(); }
286  int GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); }
287  const Stats& RefStats() const { return m_stats; }
288 
289  // These three methods are called under Cache's m_active lock
290  int get_ref_cnt() { return m_ref_cnt; }
291  int inc_ref_cnt() { return ++m_ref_cnt; }
292  int dec_ref_cnt() { return --m_ref_cnt; }
293 
295  bool is_in_emergency_shutdown() { return m_in_shutdown; }
296 
297 private:
299  File(const std::string &path, long long offset, long long fileSize);
300 
302  bool Open();
303 
304  static const char *m_traceID;
305 
306  int m_ref_cnt;
307 
308  XrdOssDF *m_data_file;
309  XrdOssDF *m_info_file;
310  Info m_cfi;
311 
312  const std::string m_filename;
313  const long long m_offset;
314  const long long m_file_size;
315 
316  // IO objects attached to this file.
317 
318  typedef std::set<IO*> IoSet_t;
319  typedef IoSet_t::iterator IoSet_i;
320 
321  IoSet_t m_io_set;
322  IoSet_i m_current_io;
323  int m_ios_in_detach;
324 
325  // FSync
326 
327  std::vector<int> m_writes_during_sync;
328  int m_non_flushed_cnt;
329  bool m_in_sync;
330  bool m_detach_time_logged;
331  bool m_in_shutdown;
332 
333  // Block state and management
334 
335  typedef std::list<int> IntList_t;
336  typedef IntList_t::iterator IntList_i;
337 
338  typedef std::map<int, Block*> BlockMap_t;
339  typedef BlockMap_t::iterator BlockMap_i;
340 
341  BlockMap_t m_block_map;
342  XrdSysCondVar m_state_cond;
343  long long m_block_size;
344  int m_num_blocks;
345 
346  // Stats and ResourceMonitor interface
347 
348  Stats m_stats;
349  Stats m_delta_stats;
350  long long m_st_blocks;
351  long long m_resmon_report_threshold;
352  int m_resmon_token;
353 
354  void check_delta_stats();
355  void report_and_merge_delta_stats();
356 
357  std::set<std::string> m_remote_locations;
358  void insert_remote_location(const std::string &loc);
359 
360  // Prefetch
361 
362  enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
363 
364  PrefetchState_e m_prefetch_state;
365 
366  int m_prefetch_read_cnt;
367  int m_prefetch_hit_cnt;
368  float m_prefetch_score; // cached
369 
370  void inc_prefetch_read_cnt(int prc) { if (prc) { m_prefetch_read_cnt += prc; calc_prefetch_score(); } }
371  void inc_prefetch_hit_cnt (int phc) { if (phc) { m_prefetch_hit_cnt += phc; calc_prefetch_score(); } }
372  void calc_prefetch_score() { m_prefetch_score = float(m_prefetch_hit_cnt) / m_prefetch_read_cnt; }
373 
374  // Helpers
375 
376  bool overlap(int blk, // block to query
377  long long blk_size, //
378  long long req_off, // offset of user request
379  int req_size, // size of user request
380  // output:
381  long long &off, // offset in user buffer
382  long long &blk_off, // offset in block
383  int &size);
384 
385  // Read & ReadV
386 
387  Block* PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch);
388 
389  void ProcessBlockRequest (Block *b);
390  void ProcessBlockRequests(BlockList_t& blks);
391 
392  void RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size);
393 
394  int ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size);
395 
396  int ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
397  ReadReqRH *rh, const char *tpfx);
398 
399  void ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond);
400  void ProcessBlockError(Block *b, ReadRequest *rreq);
401  void ProcessBlockSuccess(Block *b, ChunkRequest &creq);
402  void FinalizeReadRequest(ReadRequest *rreq);
403 
404  void ProcessBlockResponse(Block *b, int res);
405 
406  // Block management
407 
408  void inc_ref_count(Block* b);
409  void dec_ref_count(Block* b, int count = 1);
410  void free_block(Block*);
411 
412  bool select_current_io_or_disable_prefetching(bool skip_current);
413 
414  int offsetIdx(int idx) const;
415 };
416 
417 //------------------------------------------------------------------------------
418 
419 inline void File::inc_ref_count(Block* b)
420 {
421  // Method always called under lock.
422  b->m_refcnt++;
423 }
424 
425 //------------------------------------------------------------------------------
426 
427 inline void File::dec_ref_count(Block* b, int count)
428 {
429  // Method always called under lock.
430  assert(b->is_finished());
431  b->m_refcnt -= count;
432  assert(b->m_refcnt >= 0);
433 
434  if (b->m_refcnt == 0)
435  {
436  free_block(b);
437  }
438 }
439 
440 }
441 
442 #endif
XrdOucString File
Definition: XrdJob.hh:43
void Done(int result) override
Definition: XrdPfcFile.cc:1614
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:168
int get_size() const
Definition: XrdPfcFile.hh:141
int get_error() const
Definition: XrdPfcFile.hh:155
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:167
Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize, bool m_prefetch, bool cks_net)
Definition: XrdPfcFile.hh:132
void * get_req_id() const
Definition: XrdPfcFile.hh:147
long long get_offset() const
Definition: XrdPfcFile.hh:143
vChunkRequest_t m_chunk_reqs
Definition: XrdPfcFile.hh:130
bool is_finished() const
Definition: XrdPfcFile.hh:149
bool is_ok() const
Definition: XrdPfcFile.hh:150
void set_error(int err)
Definition: XrdPfcFile.hh:154
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:166
int m_n_cksum_errors
Definition: XrdPfcFile.hh:128
char * get_buff() const
Definition: XrdPfcFile.hh:140
IO * get_io() const
Definition: XrdPfcFile.hh:146
void set_downloaded()
Definition: XrdPfcFile.hh:153
bool req_cksum_net() const
Definition: XrdPfcFile.hh:164
void * m_req_id
Definition: XrdPfcFile.hh:116
bool has_cksums() const
Definition: XrdPfcFile.hh:165
File * get_file() const
Definition: XrdPfcFile.hh:145
bool is_failed() const
Definition: XrdPfcFile.hh:151
long long m_offset
Definition: XrdPfcFile.hh:119
vCkSum_t m_cksum_vec
Definition: XrdPfcFile.hh:127
void reset_error_and_set_io(IO *io, void *rid)
Definition: XrdPfcFile.hh:157
bool m_req_cksum_net
Definition: XrdPfcFile.hh:126
int get_req_size() const
Definition: XrdPfcFile.hh:142
void Done(int result) override
Definition: XrdPfcFile.cc:1622
DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait)
Definition: XrdPfcFile.hh:198
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:286
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1470
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:726
XrdSysTrace * GetTrace()
Definition: XrdPfcFile.cc:1568
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:1008
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:108
float GetPrefetchScore() const
Definition: XrdPfcFile.cc:1558
XrdSysError * GetLog()
Definition: XrdPfcFile.cc:1563
int GetNBlocks() const
Definition: XrdPfcFile.hh:285
void Prefetch()
Definition: XrdPfcFile.cc:1485
void StopPrefetchingOnIO(IO *io)
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1582
const Info::AStat * GetLastAccessStats() const
Definition: XrdPfcFile.hh:282
size_t GetAccessCnt() const
Definition: XrdPfcFile.hh:283
void AddIO(IO *io)
Definition: XrdPfcFile.cc:310
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:280
int GetBlockSize() const
Definition: XrdPfcFile.hh:284
int GetNDownloadedBlocks() const
Definition: XrdPfcFile.hh:286
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:180
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:121
int inc_ref_cnt()
Definition: XrdPfcFile.hh:291
int GetPrefetchCountOnIO(IO *io)
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1091
int dec_ref_cnt()
Definition: XrdPfcFile.hh:292
int get_ref_cnt()
Definition: XrdPfcFile.hh:290
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:689
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:194
long long GetFileSize() const
Definition: XrdPfcFile.hh:274
const Stats & RefStats() const
Definition: XrdPfcFile.hh:287
~File()
Destructor.
Definition: XrdPfcFile.cc:79
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:347
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Definition: XrdPfcFile.cc:172
const std::string & GetLocalPath() const
Definition: XrdPfcFile.hh:269
bool is_in_emergency_shutdown()
Definition: XrdPfcFile.hh:295
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:203
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
const AStat * GetLastAccessStats() const
Get latest access stats.
Definition: XrdPfcInfo.cc:491
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:469
int GetNDownloadedBlocks() const
Get number of downloaded blocks.
Definition: XrdPfcInfo.hh:398
size_t GetAccessCnt() const
Get number of accesses.
Definition: XrdPfcInfo.hh:261
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:437
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
XrdSysError Log
Definition: XrdConfig.cc:112
Definition: XrdPfc.hh:41
std::vector< ChunkRequest > vChunkRequest_t
Definition: XrdPfcFile.hh:106
std::vector< ChunkRequest >::iterator vChunkRequest_i
Definition: XrdPfcFile.hh:107
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:171
std::vector< uint32_t > vCkSum_t
Definition: XrdPfcTypes.hh:31
std::list< Block * >::iterator BlockList_i
Definition: XrdPfcFile.hh:172
ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size)
Definition: XrdPfcFile.hh:101
ReadRequest * m_read_req
Definition: XrdPfcFile.hh:96
Access statistics.
Definition: XrdPfcInfo.hh:57
XrdOucCacheIOCB * m_iocb
Definition: XrdPfcFile.hh:60
unsigned short m_seq_id
Definition: XrdPfcFile.hh:59
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition: XrdPfcFile.hh:62
void update_error_cond(int ec)
Definition: XrdPfcFile.hh:86
ReadRequest(IO *io, ReadReqRH *rh)
Definition: XrdPfcFile.hh:82
ReadReqRH * m_rh
Definition: XrdPfcFile.hh:72
bool is_complete() const
Definition: XrdPfcFile.hh:88
int return_value() const
Definition: XrdPfcFile.hh:89
long long m_bytes_read
Definition: XrdPfcFile.hh:74