XRootD
XrdPfcFile.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 
20 #include "XrdPfcFile.hh"
21 #include "XrdPfc.hh"
22 #include "XrdPfcResourceMonitor.hh"
23 #include "XrdPfcIO.hh"
24 #include "XrdPfcTrace.hh"
25 
26 #include "XrdCl/XrdClLog.hh"
27 #include "XrdCl/XrdClConstants.hh"
28 #include "XrdCl/XrdClFile.hh"
29 #include "XrdSys/XrdSysTimer.hh"
30 #include "XrdOss/XrdOss.hh"
31 #include "XrdOuc/XrdOucEnv.hh"
33 
34 #include <cstdio>
35 #include <sstream>
36 #include <fcntl.h>
37 #include <cassert>
38 
39 
40 using namespace XrdPfc;
41 
42 namespace
43 {
44 
45 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
46 
47 Cache* cache() { return &Cache::GetInstance(); }
48 
49 }
50 
51 const char *File::m_traceID = "File";
52 
53 //------------------------------------------------------------------------------
54 
55 File::File(const std::string& path, long long iOffset, long long iFileSize) :
56  m_ref_cnt(0),
57  m_data_file(0),
58  m_info_file(0),
59  m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
60  m_filename(path),
61  m_offset(iOffset),
62  m_file_size(iFileSize),
63  m_current_io(m_io_set.end()),
64  m_ios_in_detach(0),
65  m_non_flushed_cnt(0),
66  m_in_sync(false),
67  m_detach_time_logged(false),
68  m_in_shutdown(false),
69  m_state_cond(0),
70  m_block_size(0),
71  m_num_blocks(0),
72  m_resmon_token(-1),
73  m_prefetch_state(kOff),
74  m_prefetch_read_cnt(0),
75  m_prefetch_hit_cnt(0),
76  m_prefetch_score(0)
77 {}
78 
79 File::~File()
80 {
81  if (m_info_file)
82  {
83  TRACEF(Debug, "~File() close info ");
84  m_info_file->Close();
85  delete m_info_file;
86  m_info_file = nullptr;
87  }
88 
89  if (m_data_file)
90  {
91  TRACEF(Debug, "~File() close output ");
92  m_data_file->Close();
93  delete m_data_file;
94  m_data_file = nullptr;
95  }
96 
97  if (m_resmon_token >= 0)
98  {
99  // Last update of file stats has been sent from the final Sync.
100  Cache::ResMon().register_file_close(m_resmon_token, time(0), m_stats);
101  }
102 
103  TRACEF(Debug, "~File() ended, prefetch score = " << m_prefetch_score);
104 }
105 
106 //------------------------------------------------------------------------------
107 
108 File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
109 {
110  File *file = new File(path, offset, fileSize);
111  if ( ! file->Open())
112  {
113  delete file;
114  file = 0;
115  }
116  return file;
117 }
118 
119 //------------------------------------------------------------------------------
120 
122 {
123  // Called from Cache::Unlink() when the file is currently open.
124  // Cache::Unlink is also called on FSync error and when wrong number of bytes
125  // is received from a remote read.
126  //
127  // From this point onward the file will not be written to, cinfo file will
128  // not be updated, and all new read requests will return -ENOENT.
129  //
130  // File's entry in the Cache's active map is set to nullptr and will be
131  // removed from there shortly, in any case, well before this File object
132  // shuts down. So we do not communicate to Cache about our destruction when
133  // it happens.
134 
135  {
136  XrdSysCondVarHelper _lck(m_state_cond);
137 
138  m_in_shutdown = true;
139 
140  if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
141  {
142  m_prefetch_state = kStopped;
143  cache()->DeRegisterPrefetchFile(this);
144  }
145  }
146 }
147 
148 //------------------------------------------------------------------------------
149 
150 void File::check_delta_stats()
151 {
152  // Called under m_state_cond lock.
153  // BytesWritten indirectly trigger an unconditional merge through periodic Sync().
154  if (m_delta_stats.BytesRead() >= m_resmon_report_threshold)
155  report_and_merge_delta_stats();
156 }
157 
158 void File::report_and_merge_delta_stats()
159 {
160  // Called under m_state_cond lock.
161  struct stat s;
162  m_data_file->Fstat(&s);
163  m_delta_stats.m_StBlocksAdded = s.st_blocks - m_st_blocks;
164  m_st_blocks = s.st_blocks;
165  Cache::ResMon().register_file_update_stats(m_resmon_token, m_delta_stats);
166  m_stats.AddUp(m_delta_stats);
167  m_delta_stats.Reset();
168 }
169 
170 //------------------------------------------------------------------------------
171 
173 {
174  TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
175 
176  XrdSysCondVarHelper _lck(m_state_cond);
177  dec_ref_count(b);
178 }
179 
180 void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
181 {
182  TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
183 
184  XrdSysCondVarHelper _lck(m_state_cond);
185 
186  for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
187  {
188  dec_ref_count(*i);
189  }
190 }
191 
192 //------------------------------------------------------------------------------
193 
195 {
196  std::string loc(io->GetLocation());
197  XrdSysCondVarHelper _lck(m_state_cond);
198  insert_remote_location(loc);
199 }
200 
201 //------------------------------------------------------------------------------
202 
204 {
205  // Returns true if delay is needed.
206 
207  TRACEF(Debug, "ioActive start for io " << io);
208 
209  std::string loc(io->GetLocation());
210 
211  {
212  XrdSysCondVarHelper _lck(m_state_cond);
213 
214  IoSet_i mi = m_io_set.find(io);
215 
216  if (mi != m_io_set.end())
217  {
218  unsigned int n_active_reads = io->m_active_read_reqs;
219 
220  TRACE(Info, "ioActive for io " << io <<
221  ", active_reads " << n_active_reads <<
222  ", active_prefetches " << io->m_active_prefetches <<
223  ", allow_prefetching " << io->m_allow_prefetching <<
224  ", ios_in_detach " << m_ios_in_detach);
225  TRACEF(Info,
226  "\tio_map.size() " << m_io_set.size() <<
227  ", block_map.size() " << m_block_map.size() << ", file");
228 
229  insert_remote_location(loc);
230 
231  io->m_allow_prefetching = false;
232  io->m_in_detach = true;
233 
234  // Check if any IO is still available for prfetching. If not, stop it.
235  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
236  {
237  if ( ! select_current_io_or_disable_prefetching(false) )
238  {
239  TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
240  }
241  }
242 
243  // On last IO, consider write queue blocks. Note, this also contains
244  // blocks being prefetched.
245 
246  bool io_active_result;
247 
248  if (n_active_reads > 0)
249  {
250  io_active_result = true;
251  }
252  else if (m_io_set.size() - m_ios_in_detach == 1)
253  {
254  io_active_result = ! m_block_map.empty();
255  }
256  else
257  {
258  io_active_result = io->m_active_prefetches > 0;
259  }
260 
261  if ( ! io_active_result)
262  {
263  ++m_ios_in_detach;
264  }
265 
266  TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
267 
268  return io_active_result;
269  }
270  else
271  {
272  TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
273  return false;
274  }
275  }
276 }
277 
278 //------------------------------------------------------------------------------
279 
281 {
282  XrdSysCondVarHelper _lck(m_state_cond);
283  m_detach_time_logged = false;
284 }
285 
287 {
288  // Returns true if sync is required.
289  // This method is called after corresponding IO is detached from PosixCache.
290 
291  XrdSysCondVarHelper _lck(m_state_cond);
292  if ( ! m_in_shutdown)
293  {
294  if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
295  {
296  report_and_merge_delta_stats();
297  m_cfi.WriteIOStatDetach(m_stats);
298  m_detach_time_logged = true;
299  m_in_sync = true;
300  TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
301  return true;
302  }
303  }
304  TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
305  return false;
306 }
307 
308 //------------------------------------------------------------------------------
309 
310 void File::AddIO(IO *io)
311 {
312  // Called from Cache::GetFile() when a new IO asks for the file.
313 
314  TRACEF(Debug, "AddIO() io = " << (void*)io);
315 
316  time_t now = time(0);
317  std::string loc(io->GetLocation());
318 
319  m_state_cond.Lock();
320 
321  IoSet_i mi = m_io_set.find(io);
322 
323  if (mi == m_io_set.end())
324  {
325  m_io_set.insert(io);
326  io->m_attach_time = now;
327  m_delta_stats.IoAttach();
328 
329  insert_remote_location(loc);
330 
331  if (m_prefetch_state == kStopped)
332  {
333  m_prefetch_state = kOn;
334  cache()->RegisterPrefetchFile(this);
335  }
336  }
337  else
338  {
339  TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
340  }
341 
342  m_state_cond.UnLock();
343 }
344 
345 //------------------------------------------------------------------------------
346 
348 {
349  // Called from Cache::ReleaseFile.
350 
351  TRACEF(Debug, "RemoveIO() io = " << (void*)io);
352 
353  time_t now = time(0);
354 
355  m_state_cond.Lock();
356 
357  IoSet_i mi = m_io_set.find(io);
358 
359  if (mi != m_io_set.end())
360  {
361  if (mi == m_current_io)
362  {
363  ++m_current_io;
364  }
365 
366  m_delta_stats.IoDetach(now - io->m_attach_time);
367  m_io_set.erase(mi);
368  --m_ios_in_detach;
369 
370  if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
371  {
372  TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
373  m_prefetch_state = kStopped;
374  cache()->DeRegisterPrefetchFile(this);
375  }
376  }
377  else
378  {
379  TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
380  }
381 
382  m_state_cond.UnLock();
383 }
384 
385 //------------------------------------------------------------------------------
386 
387 bool File::Open()
388 {
389  // Sets errno accordingly.
390 
391  static const char *tpfx = "Open() ";
392 
393  TRACEF(Dump, tpfx << "entered");
394 
395  // Before touching anything, check with ResourceMonitor if a scan is in progress.
396  // This function will wait internally if needed until it is safe to proceed.
397  Cache::ResMon().CrossCheckIfScanIsInProgress(m_filename, m_state_cond);
398 
400 
401  XrdOss &myOss = * Cache::GetInstance().GetOss();
402  const char *myUser = conf.m_username.c_str();
403  XrdOucEnv myEnv;
404  struct stat data_stat, info_stat;
405 
406  std::string ifn = m_filename + Info::s_infoExtension;
407 
408  bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
409  bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
410 
411  // Create the data file itself.
412  char size_str[32]; sprintf(size_str, "%lld", m_file_size);
413  myEnv.Put("oss.asize", size_str);
414  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
415 
416  int res;
417 
418  if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
419  {
420  TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
421  errno = -res;
422  return false;
423  }
424 
425  m_data_file = myOss.newFile(myUser);
426  if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
427  {
428  TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
429  errno = -res;
430  delete m_data_file; m_data_file = 0;
431  return false;
432  }
433 
434  myEnv.Put("oss.asize", "64k"); // Advisory, block-map and access list lengths vary.
435  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
436  if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
437  {
438  TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
439  errno = -res;
440  m_data_file->Close(); delete m_data_file; m_data_file = 0;
441  return false;
442  }
443 
444  m_info_file = myOss.newFile(myUser);
445  if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
446  {
447  TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
448  errno = -res;
449  delete m_info_file; m_info_file = 0;
450  m_data_file->Close(); delete m_data_file; m_data_file = 0;
451  return false;
452  }
453 
454  bool initialize_info_file = true;
455 
456  if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
457  {
458  TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
459  ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
460  ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")");
461 
462  // Check if data file exists and is of reasonable size.
463  if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
464  {
465  initialize_info_file = false;
466  } else {
467  TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
468  m_cfi.ResetAllAccessStats();
469  m_data_file->Ftruncate(0);
470  Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
471  }
472  }
473 
474  if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
475  {
476  if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
477  conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
478  {
479  TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
480  initialize_info_file = true;
481  m_cfi.ResetAllAccessStats();
482  m_data_file->Ftruncate(0);
483  Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
484  } else {
485  // TODO: If the file is complete, we don't need to reset net cksums.
486  m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
487  }
488  }
489 
490  if (initialize_info_file)
491  {
492  m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.m_bufferSize, m_file_size);
493  m_cfi.SetCkSumState(conf.get_cs_Chk());
494  m_cfi.ResetNoCkSumTime();
495  m_cfi.Write(m_info_file, ifn.c_str());
496  m_info_file->Fsync();
497  TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks());
498  }
499  else
500  {
501  if (futimens(m_info_file->getFD(), NULL)) {
502  TRACEF(Error, tpfx << "failed setting modification time " << ERRNO_AND_ERRSTR(errno));
503  }
504  }
505 
506  m_cfi.WriteIOStatAttach();
507  m_state_cond.Lock();
508  m_block_size = m_cfi.GetBufferSize();
509  m_num_blocks = m_cfi.GetNBlocks();
510  m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
511 
512  m_data_file->Fstat(&data_stat);
513  m_st_blocks = data_stat.st_blocks;
514 
515  m_resmon_token = Cache::ResMon().register_file_open(m_filename, time(0), data_existed);
516  m_resmon_report_threshold = std::min(std::max(200ll * 1024, m_file_size / 50), 500ll * 1024 * 1024);
517  // m_resmon_report_threshold_scaler; // something like 10% of original threshold, to adjust
518  // actual threshold based on return values from register_file_update_stats().
519 
520  m_state_cond.UnLock();
521 
522  return true;
523 }
524 
525 
526 //==============================================================================
527 // Read and helpers
528 //==============================================================================
529 
530 bool File::overlap(int blk, // block to query
531  long long blk_size, //
532  long long req_off, // offset of user request
533  int req_size, // size of user request
534  // output:
535  long long &off, // offset in user buffer
536  long long &blk_off, // offset in block
537  int &size) // size to copy
538 {
539  const long long beg = blk * blk_size;
540  const long long end = beg + blk_size;
541  const long long req_end = req_off + req_size;
542 
543  if (req_off < end && req_end > beg)
544  {
545  const long long ovlp_beg = std::max(beg, req_off);
546  const long long ovlp_end = std::min(end, req_end);
547 
548  off = ovlp_beg - req_off;
549  blk_off = ovlp_beg - beg;
550  size = (int) (ovlp_end - ovlp_beg);
551 
552  assert(size <= blk_size);
553  return true;
554  }
555  else
556  {
557  return false;
558  }
559 }
560 
561 //------------------------------------------------------------------------------
562 
563 Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
564 {
565  // Must be called w/ state_cond locked.
566  // Checks on size etc should be done before.
567  //
568  // Reference count is 0 so increase it in calling function if you want to
569  // catch the block while still in memory.
570 
571  const long long off = i * m_block_size;
572  const int last_block = m_num_blocks - 1;
573  const bool cs_net = cache()->RefConfiguration().is_cschk_net();
574 
575  int blk_size, req_size;
576  if (i == last_block) {
577  blk_size = req_size = m_file_size - off;
578  if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
579  } else {
580  blk_size = req_size = m_block_size;
581  }
582 
583  Block *b = 0;
584  char *buf = cache()->RequestRAM(req_size);
585 
586  if (buf)
587  {
588  b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
589 
590  if (b)
591  {
592  m_block_map[i] = b;
593 
594  // Actual Read request is issued in ProcessBlockRequests().
595 
596  if (m_prefetch_state == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
597  {
598  m_prefetch_state = kHold;
599  cache()->DeRegisterPrefetchFile(this);
600  }
601  }
602  else
603  {
604  TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
605  }
606  }
607 
608  return b;
609 }
610 
611 void File::ProcessBlockRequest(Block *b)
612 {
613  // This *must not* be called with block_map locked.
614 
616 
617  if (XRD_TRACE What >= TRACE_Dump) {
618  char buf[256];
619  snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
620  b->get_offset()/m_block_size, b, b->m_prefetch, b->get_offset(), b->get_req_size(), b->get_buff(), brh);
621  TRACEF(Dump, "ProcessBlockRequest() " << buf);
622  }
623 
624  if (b->req_cksum_net())
625  {
626  b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
627  b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
628  } else {
629  b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
630  }
631 }
632 
633 void File::ProcessBlockRequests(BlockList_t& blks)
634 {
635  // This *must not* be called with block_map locked.
636 
637  for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
638  {
639  ProcessBlockRequest(*bi);
640  }
641 }
642 
643 //------------------------------------------------------------------------------
644 
645 void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
646 {
647  int n_chunks = ioVec.size();
648  int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
649 
650  TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
651  ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
652 
653  DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
654 
655  int pos = 0;
656  while (n_chunks > XrdProto::maxRvecsz) {
657  io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
658  pos += XrdProto::maxRvecsz;
659  n_chunks -= XrdProto::maxRvecsz;
660  }
661  io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
662 }
663 
664 //------------------------------------------------------------------------------
665 
666 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
667 {
668  TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
669 
670  long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
671 
672  if (rs < 0)
673  {
674  TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
675  return rs;
676  }
677 
678  if (rs != expected_size)
679  {
680  TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
681  return -EIO;
682  }
683 
684  return (int) rs;
685 }
686 
687 //------------------------------------------------------------------------------
688 
689 int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
690 {
691  // rrc_func is ONLY called from async processing.
692  // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
693  // This streamlines implementation of synchronous IO::Read().
694 
695  TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
696 
697  m_state_cond.Lock();
698 
699  if (m_in_shutdown || io->m_in_detach)
700  {
701  m_state_cond.UnLock();
702  return m_in_shutdown ? -ENOENT : -EBADF;
703  }
704 
705  // Shortcut -- file is fully downloaded.
706 
707  if (m_cfi.IsComplete())
708  {
709  m_state_cond.UnLock();
710  int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
711  if (ret > 0) {
712  XrdSysCondVarHelper _lck(m_state_cond);
713  m_delta_stats.AddBytesHit(ret);
714  check_delta_stats();
715  }
716  return ret;
717  }
718 
719  XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
720 
721  return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
722 }
723 
724 //------------------------------------------------------------------------------
725 
726 int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
727 {
728  TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
729 
730  m_state_cond.Lock();
731 
732  if (m_in_shutdown || io->m_in_detach)
733  {
734  m_state_cond.UnLock();
735  return m_in_shutdown ? -ENOENT : -EBADF;
736  }
737 
738  // Shortcut -- file is fully downloaded.
739 
740  if (m_cfi.IsComplete())
741  {
742  m_state_cond.UnLock();
743  int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
744  if (ret > 0) {
745  XrdSysCondVarHelper _lck(m_state_cond);
746  m_delta_stats.AddBytesHit(ret);
747  check_delta_stats();
748  }
749  return ret;
750  }
751 
752  return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
753 }
754 
755 //------------------------------------------------------------------------------
756 
757 int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
758  ReadReqRH *rh, const char *tpfx)
759 {
760  // Non-trivial processing for Read and ReadV.
761  // Entered under lock.
762  //
763  // loop over reqired blocks:
764  // - if on disk, ok;
765  // - if in ram or incoming, inc ref-count
766  // - otherwise request and inc ref count (unless RAM full => request direct)
767  // unlock
768 
769  int prefetch_cnt = 0;
770 
771  ReadRequest *read_req = nullptr;
772  BlockList_t blks_to_request; // blocks we are issuing a new remote request for
773 
774  std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
775 
776  std::vector<XrdOucIOVec> iovec_disk;
777  std::vector<XrdOucIOVec> iovec_direct;
778  int iovec_disk_total = 0;
779  int iovec_direct_total = 0;
780 
781  for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
782  {
783  const XrdOucIOVec &iov = readV[iov_idx];
784  long long iUserOff = iov.offset;
785  int iUserSize = iov.size;
786  char *iUserBuff = iov.data;
787 
788  const int idx_first = iUserOff / m_block_size;
789  const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
790 
791  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
792 
793  enum LastBlock_e { LB_other, LB_disk, LB_direct };
794 
795  LastBlock_e lbe = LB_other;
796 
797  for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
798  {
799  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
800  BlockMap_i bi = m_block_map.find(block_idx);
801 
802  // overlap and read
803  long long off; // offset in user buffer
804  long long blk_off; // offset in block
805  int size; // size to copy
806 
807  overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
808 
809  // In RAM or incoming?
810  if (bi != m_block_map.end())
811  {
812  inc_ref_count(bi->second);
813  TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
814 
815  if (bi->second->is_finished())
816  {
817  // note, blocks with error should not be here !!!
818  // they should be either removed or reissued in ProcessBlockResponse()
819  assert(bi->second->is_ok());
820 
821  blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
822 
823  if (bi->second->m_prefetch)
824  ++prefetch_cnt;
825  }
826  else
827  {
828  if ( ! read_req)
829  read_req = new ReadRequest(io, rh);
830 
831  // We have a lock on state_cond --> as we register the request before releasing the lock,
832  // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
833 
834  bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
835  ++read_req->m_n_chunk_reqs;
836  }
837 
838  lbe = LB_other;
839  }
840  // On disk?
841  else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
842  {
843  TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
844 
845  if (lbe == LB_disk)
846  iovec_disk.back().size += size;
847  else
848  iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
849  iovec_disk_total += size;
850 
851  if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
852  ++prefetch_cnt;
853 
854  lbe = LB_disk;
855  }
856  // Neither ... then we have to go get it ...
857  else
858  {
859  if ( ! read_req)
860  read_req = new ReadRequest(io, rh);
861 
862  // Is there room for one more RAM Block?
863  Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
864  if (b)
865  {
866  TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
867  inc_ref_count(b);
868  blks_to_request.push_back(b);
869 
870  b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
871  ++read_req->m_n_chunk_reqs;
872 
873  lbe = LB_other;
874  }
875  else // Nope ... read this directly without caching.
876  {
877  TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
878 
879  iovec_direct_total += size;
880  read_req->m_direct_done = false;
881 
882  // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
883  // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
884  // is determined in the RequestBlocksDirect().
885  if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
886  iovec_direct.back().size += size;
887  } else {
888  long long in_offset = block_idx * m_block_size + blk_off;
889  char *out_pos = iUserBuff + off;
890  while (size > XrdProto::maxRVdsz) {
891  iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
892  in_offset += XrdProto::maxRVdsz;
893  out_pos += XrdProto::maxRVdsz;
894  size -= XrdProto::maxRVdsz;
895  }
896  iovec_direct.push_back( { in_offset, size, 0, out_pos } );
897  }
898 
899  lbe = LB_direct;
900  }
901  }
902  } // end for over blocks in an IOVec
903  } // end for over readV IOVec
904 
905  inc_prefetch_hit_cnt(prefetch_cnt);
906 
907  m_state_cond.UnLock();
908 
909  // First, send out remote requests for new blocks.
910  if ( ! blks_to_request.empty())
911  {
912  ProcessBlockRequests(blks_to_request);
913  blks_to_request.clear();
914  }
915 
916  // Second, send out remote direct read requests.
917  if ( ! iovec_direct.empty())
918  {
919  RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
920 
921  TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
922  }
923 
924  // Begin synchronous part where we process data that is already in RAM or on disk.
925 
926  long long bytes_read = 0;
927  int error_cond = 0; // to be set to -errno
928 
929  // Third, process blocks that are available in RAM.
930  if ( ! blks_ready.empty())
931  {
932  for (auto &bvi : blks_ready)
933  {
934  for (auto &cr : bvi.second)
935  {
936  TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
937  memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
938  bytes_read += cr.m_size;
939  }
940  }
941  }
942 
943  // Fourth, read blocks from disk.
944  if ( ! iovec_disk.empty())
945  {
946  int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
947  TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
948  if (rc >= 0)
949  {
950  bytes_read += rc;
951  }
952  else
953  {
954  error_cond = rc;
955  TRACEF(Error, tpfx << "failed read from disk");
956  }
957  }
958 
959  // End synchronous part -- update with sync stats and determine actual state of this read.
960  // Note: remote reads might have already finished during disk-read!
961 
962  m_state_cond.Lock();
963 
964  for (auto &bvi : blks_ready)
965  dec_ref_count(bvi.first, (int) bvi.second.size());
966 
967  if (read_req)
968  {
969  read_req->m_bytes_read += bytes_read;
970  read_req->update_error_cond(error_cond);
971  read_req->m_stats.m_BytesHit += bytes_read;
972  read_req->m_sync_done = true;
973 
974  if (read_req->is_complete())
975  {
976  // Almost like FinalizeReadRequest(read_req) -- but no callout!
977  m_delta_stats.AddReadStats(read_req->m_stats);
978  check_delta_stats();
979  m_state_cond.UnLock();
980 
981  int ret = read_req->return_value();
982  delete read_req;
983  return ret;
984  }
985  else
986  {
987  m_state_cond.UnLock();
988  return -EWOULDBLOCK;
989  }
990  }
991  else
992  {
993  m_delta_stats.m_BytesHit += bytes_read;
994  check_delta_stats();
995  m_state_cond.UnLock();
996 
997  // !!! No callout.
998 
999  return error_cond ? error_cond : bytes_read;
1000  }
1001 }
1002 
1003 
1004 //==============================================================================
1005 // WriteBlock and Sync
1006 //==============================================================================
1007 
1009 {
1010  // write block buffer into disk file
1011  long long offset = b->m_offset - m_offset;
1012  long long size = b->get_size();
1013  ssize_t retval;
1014 
1015  if (m_cfi.IsCkSumCache())
1016  if (b->has_cksums())
1017  retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1018  else
1019  retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1020  else
1021  retval = m_data_file->Write(b->get_buff(), offset, size);
1022 
1023  if (retval < size)
1024  {
1025  if (retval < 0)
1026  {
1027  GetLog()->Emsg("WriteToDisk()", -retval, "write block to disk", GetLocalPath().c_str());
1028  }
1029  else
1030  {
1031  TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1032  }
1033 
1034  XrdSysCondVarHelper _lck(m_state_cond);
1035 
1036  dec_ref_count(b);
1037 
1038  return;
1039  }
1040 
1041  const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1042 
1043  // Set written bit.
1044  TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1045 
1046  bool schedule_sync = false;
1047  {
1048  XrdSysCondVarHelper _lck(m_state_cond);
1049 
1050  m_cfi.SetBitWritten(blk_idx);
1051 
1052  if (b->m_prefetch)
1053  {
1054  m_cfi.SetBitPrefetch(blk_idx);
1055  }
1056  if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1057  {
1058  m_cfi.ResetCkSumNet();
1059  }
1060 
1061  dec_ref_count(b);
1062 
1063  // Set synced bit or stash block index if in actual sync.
1064  // Synced state is only written out to cinfo file when data file is synced.
1065  if (m_in_sync)
1066  {
1067  m_writes_during_sync.push_back(blk_idx);
1068  }
1069  else
1070  {
1071  m_cfi.SetBitSynced(blk_idx);
1072  ++m_non_flushed_cnt;
1073  if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1074  ! m_in_shutdown)
1075  {
1076  schedule_sync = true;
1077  m_in_sync = true;
1078  m_non_flushed_cnt = 0;
1079  }
1080  }
1081  }
1082 
1083  if (schedule_sync)
1084  {
1085  cache()->ScheduleFileSync(this);
1086  }
1087 }
1088 
1089 //------------------------------------------------------------------------------
1090 
1092 {
1093  TRACEF(Dump, "Sync()");
1094 
1095  int ret = m_data_file->Fsync();
1096  bool errorp = false;
1097  if (ret == XrdOssOK)
1098  {
1099  Stats loc_stats;
1100  {
1101  XrdSysCondVarHelper _lck(&m_state_cond);
1102  report_and_merge_delta_stats();
1103  loc_stats = m_stats;
1104  }
1105  m_cfi.WriteIOStat(loc_stats);
1106  m_cfi.Write(m_info_file, m_filename.c_str());
1107  int cret = m_info_file->Fsync();
1108  if (cret != XrdOssOK)
1109  {
1110  TRACEF(Error, "Sync cinfo file sync error " << cret);
1111  errorp = true;
1112  }
1113  }
1114  else
1115  {
1116  TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1117  errorp = true;
1118  }
1119 
1120  if (errorp)
1121  {
1122  TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1123 
1124  // Unlink will also call this->initiate_emergency_shutdown()
1125  Cache::GetInstance().UnlinkFile(m_filename, false);
1126 
1127  XrdSysCondVarHelper _lck(&m_state_cond);
1128 
1129  m_writes_during_sync.clear();
1130  m_in_sync = false;
1131 
1132  return;
1133  }
1134 
1135  int written_while_in_sync;
1136  bool resync = false;
1137  {
1138  XrdSysCondVarHelper _lck(&m_state_cond);
1139  for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1140  {
1141  m_cfi.SetBitSynced(*i);
1142  }
1143  written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1144  m_writes_during_sync.clear();
1145 
1146  // If there were writes during sync and the file is now complete,
1147  // let us call Sync again without resetting the m_in_sync flag.
1148  if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1149  resync = true;
1150  else
1151  m_in_sync = false;
1152  }
1153  TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1154 
1155  if (resync)
1156  Sync();
1157 }
1158 
1159 
1160 //==============================================================================
1161 // Block processing
1162 //==============================================================================
1163 
1164 void File::free_block(Block* b)
1165 {
1166  // Method always called under lock.
1167  int i = b->m_offset / m_block_size;
1168  TRACEF(Dump, "free_block block " << b << " idx = " << i);
1169  size_t ret = m_block_map.erase(i);
1170  if (ret != 1)
1171  {
1172  // assert might be a better option than a warning
1173  TRACEF(Error, "free_block did not erase " << i << " from map");
1174  }
1175  else
1176  {
1177  cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1178  delete b;
1179  }
1180 
1181  if (m_prefetch_state == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
1182  {
1183  m_prefetch_state = kOn;
1184  cache()->RegisterPrefetchFile(this);
1185  }
1186 }
1187 
1188 //------------------------------------------------------------------------------
1189 
1190 bool File::select_current_io_or_disable_prefetching(bool skip_current)
1191 {
1192  // Method always called under lock. It also expects prefetch to be active.
1193 
1194  int io_size = (int) m_io_set.size();
1195  bool io_ok = false;
1196 
1197  if (io_size == 1)
1198  {
1199  io_ok = (*m_io_set.begin())->m_allow_prefetching;
1200  if (io_ok)
1201  {
1202  m_current_io = m_io_set.begin();
1203  }
1204  }
1205  else if (io_size > 1)
1206  {
1207  IoSet_i mi = m_current_io;
1208  if (skip_current && mi != m_io_set.end()) ++mi;
1209 
1210  for (int i = 0; i < io_size; ++i)
1211  {
1212  if (mi == m_io_set.end()) mi = m_io_set.begin();
1213 
1214  if ((*mi)->m_allow_prefetching)
1215  {
1216  m_current_io = mi;
1217  io_ok = true;
1218  break;
1219  }
1220  ++mi;
1221  }
1222  }
1223 
1224  if ( ! io_ok)
1225  {
1226  m_current_io = m_io_set.end();
1227  m_prefetch_state = kStopped;
1228  cache()->DeRegisterPrefetchFile(this);
1229  }
1230 
1231  return io_ok;
1232 }
1233 
1234 //------------------------------------------------------------------------------
1235 
1236 void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1237 {
1238  // Called from DirectResponseHandler.
1239  // NOT under lock.
1240 
1241  if (error_cond)
1242  TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1243 
1244  m_state_cond.Lock();
1245 
1246  if (error_cond)
1247  rreq->update_error_cond(error_cond);
1248  else {
1249  rreq->m_stats.m_BytesBypassed += bytes_read;
1250  rreq->m_bytes_read += bytes_read;
1251  }
1252 
1253  rreq->m_direct_done = true;
1254 
1255  bool rreq_complete = rreq->is_complete();
1256 
1257  m_state_cond.UnLock();
1258 
1259  if (rreq_complete)
1260  FinalizeReadRequest(rreq);
1261 }
1262 
1263 void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1264 {
1265  // Called from ProcessBlockResponse().
1266  // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1267  // Does not manage m_read_req.
1268  // Will not complete the request.
1269 
1270  TRACEF(Error, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1271  " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1272 
1273  rreq->update_error_cond(b->get_error());
1274  --rreq->m_n_chunk_reqs;
1275 
1276  dec_ref_count(b);
1277 }
1278 
1279 void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1280 {
1281  // Called from ProcessBlockResponse().
1282  // NOT under lock as it does memcopy ofor exisf block data.
1283  // Acquires lock for block, m_read_req and rreq state update.
1284 
1285  ReadRequest *rreq = creq.m_read_req;
1286 
1287  TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1288  memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1289 
1290  m_state_cond.Lock();
1291 
1292  rreq->m_bytes_read += creq.m_size;
1293 
1294  if (b->get_req_id() == (void*) rreq)
1295  rreq->m_stats.m_BytesMissed += creq.m_size;
1296  else
1297  rreq->m_stats.m_BytesHit += creq.m_size;
1298 
1299  --rreq->m_n_chunk_reqs;
1300 
1301  if (b->m_prefetch)
1302  inc_prefetch_hit_cnt(1);
1303 
1304  dec_ref_count(b);
1305 
1306  bool rreq_complete = rreq->is_complete();
1307 
1308  m_state_cond.UnLock();
1309 
1310  if (rreq_complete)
1311  FinalizeReadRequest(rreq);
1312 }
1313 
1314 void File::FinalizeReadRequest(ReadRequest *rreq)
1315 {
1316  // called from ProcessBlockResponse()
1317  // NOT under lock -- does callout
1318  {
1319  XrdSysCondVarHelper _lck(m_state_cond);
1320  m_delta_stats.AddReadStats(rreq->m_stats);
1321  check_delta_stats();
1322  }
1323 
1324  rreq->m_rh->Done(rreq->return_value());
1325  delete rreq;
1326 }
1327 
1328 void File::ProcessBlockResponse(Block *b, int res)
1329 {
1330  static const char* tpfx = "ProcessBlockResponse ";
1331 
1332  TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1333 
1334  if (res >= 0 && res != b->get_size())
1335  {
1336  // Incorrect number of bytes received, apparently size of the file on the remote
1337  // is different than what the cache expects it to be.
1338  TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1339  Cache::GetInstance().UnlinkFile(m_filename, false);
1340  }
1341 
1342  m_state_cond.Lock();
1343 
1344  // Deregister block from IO's prefetch count, if needed.
1345  if (b->m_prefetch)
1346  {
1347  IO *io = b->get_io();
1348  IoSet_i mi = m_io_set.find(io);
1349  if (mi != m_io_set.end())
1350  {
1351  --io->m_active_prefetches;
1352 
1353  // If failed and IO is still prefetching -- disable prefetching on this IO.
1354  if (res < 0 && io->m_allow_prefetching)
1355  {
1356  TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1357  io->m_allow_prefetching = false;
1358 
1359  // Check if any IO is still available for prfetching. If not, stop it.
1360  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1361  {
1362  if ( ! select_current_io_or_disable_prefetching(false) )
1363  {
1364  TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1365  }
1366  }
1367  }
1368 
1369  // If failed with no subscribers -- delete the block and exit.
1370  if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1371  {
1372  free_block(b);
1373  m_state_cond.UnLock();
1374  return;
1375  }
1376  }
1377  else
1378  {
1379  TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1380  }
1381  }
1382 
1383  if (res == b->get_size())
1384  {
1385  b->set_downloaded();
1386  TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1387  if ( ! m_in_shutdown)
1388  {
1389  // Increase ref-count for the writer.
1390  inc_ref_count(b);
1391  m_delta_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1392  // No check for writes, report-and-merge forced during Sync().
1393  cache()->AddWriteTask(b, true);
1394  }
1395 
1396  // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1397  vChunkRequest_t creqs_to_notify;
1398  creqs_to_notify.swap( b->m_chunk_reqs );
1399 
1400  m_state_cond.UnLock();
1401 
1402  for (auto &creq : creqs_to_notify)
1403  {
1404  ProcessBlockSuccess(b, creq);
1405  }
1406  }
1407  else
1408  {
1409  if (res < 0) {
1410  TRACEF(Error, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << " error=" << res);
1411  } else {
1412  TRACEF(Error, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << " incomplete, got " << res << " expected " << b->get_size());
1413 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1414  res = -EIO;
1415 #else
1416  res = -EREMOTEIO;
1417 #endif
1418  }
1419  b->set_error(res);
1420 
1421  // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1422  // Collect others with a different IO, the first of them will be used to reissue the request.
1423  // This is then done outside of lock.
1424  std::list<ReadRequest*> rreqs_to_complete;
1425  vChunkRequest_t creqs_to_keep;
1426 
1427  for(ChunkRequest &creq : b->m_chunk_reqs)
1428  {
1429  ReadRequest *rreq = creq.m_read_req;
1430 
1431  if (rreq->m_io == b->get_io())
1432  {
1433  ProcessBlockError(b, rreq);
1434  if (rreq->is_complete())
1435  {
1436  rreqs_to_complete.push_back(rreq);
1437  }
1438  }
1439  else
1440  {
1441  creqs_to_keep.push_back(creq);
1442  }
1443  }
1444 
1445  bool reissue = false;
1446  if ( ! creqs_to_keep.empty())
1447  {
1448  ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1449 
1450  TRACEF(Info, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1451  b->get_io() << " - reissuing request with my io " << rreq->m_io);
1452 
1453  b->reset_error_and_set_io(rreq->m_io, rreq);
1454  b->m_chunk_reqs.swap( creqs_to_keep );
1455  reissue = true;
1456  }
1457 
1458  m_state_cond.UnLock();
1459 
1460  for (auto rreq : rreqs_to_complete)
1461  FinalizeReadRequest(rreq);
1462 
1463  if (reissue)
1464  ProcessBlockRequest(b);
1465  }
1466 }
1467 
1468 //------------------------------------------------------------------------------
1469 
1470 const char* File::lPath() const
1471 {
1472  return m_filename.c_str();
1473 }
1474 
1475 //------------------------------------------------------------------------------
1476 
1477 int File::offsetIdx(int iIdx) const
1478 {
1479  return iIdx - m_offset/m_block_size;
1480 }
1481 
1482 
1483 //------------------------------------------------------------------------------
1484 
1486 {
1487  // Check that block is not on disk and not in RAM.
1488  // TODO: Could prefetch several blocks at once!
1489  // blks_max could be an argument
1490 
1491  BlockList_t blks;
1492 
1493  TRACEF(DumpXL, "Prefetch() entering.");
1494  {
1495  XrdSysCondVarHelper _lck(m_state_cond);
1496 
1497  if (m_prefetch_state != kOn)
1498  {
1499  return;
1500  }
1501 
1502  if ( ! select_current_io_or_disable_prefetching(true) )
1503  {
1504  TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1505  return;
1506  }
1507 
1508  // Select block(s) to fetch.
1509  for (int f = 0; f < m_num_blocks; ++f)
1510  {
1511  if ( ! m_cfi.TestBitWritten(f))
1512  {
1513  int f_act = f + m_offset / m_block_size;
1514 
1515  BlockMap_i bi = m_block_map.find(f_act);
1516  if (bi == m_block_map.end())
1517  {
1518  Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1519  if (b)
1520  {
1521  TRACEF(Dump, "Prefetch take block " << f_act);
1522  blks.push_back(b);
1523  // Note: block ref_cnt not increased, it will be when placed into write queue.
1524 
1525  inc_prefetch_read_cnt(1);
1526  }
1527  else
1528  {
1529  // This shouldn't happen as prefetching stops when RAM is 70% full.
1530  TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1531  }
1532  break;
1533  }
1534  }
1535  }
1536 
1537  if (blks.empty())
1538  {
1539  TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1540  m_prefetch_state = kComplete;
1541  cache()->DeRegisterPrefetchFile(this);
1542  }
1543  else
1544  {
1545  (*m_current_io)->m_active_prefetches += (int) blks.size();
1546  }
1547  }
1548 
1549  if ( ! blks.empty())
1550  {
1551  ProcessBlockRequests(blks);
1552  }
1553 }
1554 
1555 
1556 //------------------------------------------------------------------------------
1557 
1559 {
1560  return m_prefetch_score;
1561 }
1562 
1564 {
1565  return Cache::GetInstance().GetLog();
1566 }
1567 
1569 {
1570  return Cache::GetInstance().GetTrace();
1571 }
1572 
1573 void File::insert_remote_location(const std::string &loc)
1574 {
1575  if ( ! loc.empty())
1576  {
1577  size_t p = loc.find_first_of('@');
1578  m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1579  }
1580 }
1581 
1582 std::string File::GetRemoteLocations() const
1583 {
1584  std::string s;
1585  if ( ! m_remote_locations.empty())
1586  {
1587  size_t sl = 0;
1588  int nl = 0;
1589  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1590  {
1591  sl += i->size();
1592  }
1593  s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1594  s = '[';
1595  int j = 1;
1596  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1597  {
1598  s += '"'; s += *i; s += '"';
1599  if (j < nl) s += ',';
1600  }
1601  s += ']';
1602  }
1603  else
1604  {
1605  s = "[]";
1606  }
1607  return s;
1608 }
1609 
1610 //==============================================================================
1611 //======================= RESPONSE HANDLERS ==============================
1612 //==============================================================================
1613 
1615 {
1616  m_block->m_file->ProcessBlockResponse(m_block, res);
1617  delete this;
1618 }
1619 
1620 //------------------------------------------------------------------------------
1621 
1623 {
1624  m_mutex.Lock();
1625 
1626  int n_left = --m_to_wait;
1627 
1628  if (res < 0) {
1629  if (m_errno == 0) m_errno = res; // store first reported error
1630  } else {
1631  m_bytes_read += res;
1632  }
1633 
1634  m_mutex.UnLock();
1635 
1636  if (n_left == 0)
1637  {
1638  m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1639  delete this;
1640  }
1641 }
#define XrdOssOK
Definition: XrdOss.hh:50
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
#define TRACE_Dump
Definition: XrdPfcTrace.hh:11
#define TRACEF(act, x)
Definition: XrdPfcTrace.hh:63
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:41
int stat(const char *path, struct stat *buf)
#define XRD_TRACE
Definition: XrdScheduler.cc:48
@ Warning
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:99
#define TRACE(act, x)
Definition: XrdTrace.hh:63
virtual int Fsync()
Definition: XrdOss.hh:144
virtual int Ftruncate(unsigned long long flen)
Definition: XrdOss.hh:164
virtual int Fstat(struct stat *buf)
Definition: XrdOss.hh:136
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition: XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual ssize_t Read(off_t offset, size_t size)
Definition: XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition: XrdOss.cc:198
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition: XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:345
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
Definition: XrdOucCache.cc:39
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
Definition: XrdOucCache.cc:86
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
void Done(int result) override
Definition: XrdPfcFile.cc:1614
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:168
int get_size() const
Definition: XrdPfcFile.hh:141
int get_error() const
Definition: XrdPfcFile.hh:155
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:167
void * get_req_id() const
Definition: XrdPfcFile.hh:147
long long get_offset() const
Definition: XrdPfcFile.hh:143
vChunkRequest_t m_chunk_reqs
Definition: XrdPfcFile.hh:130
void set_error(int err)
Definition: XrdPfcFile.hh:154
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:166
char * get_buff() const
Definition: XrdPfcFile.hh:140
IO * get_io() const
Definition: XrdPfcFile.hh:146
void set_downloaded()
Definition: XrdPfcFile.hh:153
bool req_cksum_net() const
Definition: XrdPfcFile.hh:164
bool has_cksums() const
Definition: XrdPfcFile.hh:165
long long m_offset
Definition: XrdPfcFile.hh:119
void reset_error_and_set_io(IO *io, void *rid)
Definition: XrdPfcFile.hh:157
int get_req_size() const
Definition: XrdPfcFile.hh:142
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:151
XrdOss * GetOss() const
Definition: XrdPfc.hh:263
XrdSysTrace * GetTrace()
Definition: XrdPfc.hh:278
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:199
static ResourceMonitor & ResMon()
Definition: XrdPfc.cc:133
XrdSysError * GetLog()
Definition: XrdPfc.hh:277
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:130
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1120
void Done(int result) override
Definition: XrdPfcFile.cc:1622
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:286
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1470
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:726
XrdSysTrace * GetTrace()
Definition: XrdPfcFile.cc:1568
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:1008
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:108
float GetPrefetchScore() const
Definition: XrdPfcFile.cc:1558
friend class BlockResponseHandler
Definition: XrdPfcFile.hh:209
XrdSysError * GetLog()
Definition: XrdPfcFile.cc:1563
void Prefetch()
Definition: XrdPfcFile.cc:1485
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1582
void AddIO(IO *io)
Definition: XrdPfcFile.cc:310
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:280
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:180
friend class DirectResponseHandler
Definition: XrdPfcFile.hh:210
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:121
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1091
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:689
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:194
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:347
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Definition: XrdPfcFile.cc:172
const std::string & GetLocalPath() const
Definition: XrdPfcFile.hh:269
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:203
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:30
RAtomic_int m_active_read_reqs
number of active read requests
Definition: XrdPfcIO.hh:70
const char * GetLocation()
Definition: XrdPfcIO.hh:44
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
Definition: XrdPfcInfo.hh:365
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309
void SetBitSynced(int i)
Mark block as synced to disk.
Definition: XrdPfcInfo.hh:387
time_t GetNoCkSumTimeForUVKeep() const
Definition: XrdPfcInfo.hh:301
CkSumCheck_e GetCkSumState() const
Definition: XrdPfcInfo.hh:286
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
Definition: XrdPfcInfo.cc:422
void ResetCkSumNet()
Definition: XrdPfcInfo.cc:215
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:268
void DowngradeCkSumState(CkSumCheck_e css_ref)
Definition: XrdPfcInfo.hh:295
bool IsCkSumNet() const
Definition: XrdPfcInfo.hh:290
void ResetAllAccessStats()
Reset IO Stats.
Definition: XrdPfcInfo.cc:361
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
Definition: XrdPfcInfo.hh:376
bool IsComplete() const
Get complete status.
Definition: XrdPfcInfo.hh:447
bool IsCkSumCache() const
Definition: XrdPfcInfo.hh:289
void SetBitWritten(int i)
Mark block as written to disk.
Definition: XrdPfcInfo.hh:352
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:469
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:431
long long GetExpectedDataFileSize() const
Get expected data file size.
Definition: XrdPfcInfo.hh:420
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
Definition: XrdPfcInfo.hh:343
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:296
void SetCkSumState(CkSumCheck_e css)
Definition: XrdPfcInfo.hh:294
void ResetNoCkSumTime()
Definition: XrdPfcInfo.hh:302
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
Definition: XrdPfcInfo.cc:163
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:440
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:437
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
void IoAttach()
Definition: XrdPfcStats.hh:85
void AddReadStats(const Stats &s)
Definition: XrdPfcStats.hh:67
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
Definition: XrdPfcStats.hh:43
long long m_BytesBypassed
number of bytes served directly through XrdCl
Definition: XrdPfcStats.hh:41
void AddUp(const Stats &s)
Definition: XrdPfcStats.hh:114
void AddWriteStats(long long bytes_written, int n_cks_errs)
Definition: XrdPfcStats.hh:79
void AddBytesHit(long long bh)
Definition: XrdPfcStats.hh:74
long long BytesRead() const
Definition: XrdPfcStats.hh:97
long long m_BytesHit
number of bytes served from disk
Definition: XrdPfcStats.hh:39
void IoDetach(int duration)
Definition: XrdPfcStats.hh:90
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
Definition: XrdPfc.hh:41
std::vector< ChunkRequest > vChunkRequest_t
Definition: XrdPfcFile.hh:106
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:171
std::list< Block * >::iterator BlockList_i
Definition: XrdPfcFile.hh:172
static const int maxRVdsz
Definition: XProtocol.hh:688
static const int maxRvecsz
Definition: XProtocol.hh:686
@ hex1
Definition: XrdSysTrace.hh:42
long long offset
Definition: XrdOucIOVec.hh:42
char * data
Definition: XrdOucIOVec.hh:45
ReadRequest * m_read_req
Definition: XrdPfcFile.hh:96
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:115
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition: XrdPfc.hh:80
CkSumCheck_e get_cs_Chk() const
Definition: XrdPfc.hh:73
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:112
bool should_uvkeep_purge(time_t delta) const
Definition: XrdPfc.hh:82
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:88
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:107
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:89
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:87
unsigned short m_seq_id
Definition: XrdPfcFile.hh:59
void update_error_cond(int ec)
Definition: XrdPfcFile.hh:86
ReadReqRH * m_rh
Definition: XrdPfcFile.hh:72
bool is_complete() const
Definition: XrdPfcFile.hh:88
int return_value() const
Definition: XrdPfcFile.hh:89
long long m_bytes_read
Definition: XrdPfcFile.hh:74