XRootD
XrdPfc.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 #include <fcntl.h>
20 #include <sstream>
21 #include <algorithm>
22 #include <sys/statvfs.h>
23 
24 #include "XrdCl/XrdClConstants.hh"
25 #include "XrdCl/XrdClURL.hh"
26 
27 #include "XrdOuc/XrdOucEnv.hh"
28 #include "XrdOuc/XrdOucUtils.hh"
30 
31 #include "XrdSys/XrdSysTimer.hh"
32 #include "XrdSys/XrdSysTrace.hh"
33 
35 
36 #include "XrdOss/XrdOss.hh"
37 
38 #include "XrdPfc.hh"
39 #include "XrdPfcTrace.hh"
40 #include "XrdPfcFSctl.hh"
41 #include "XrdPfcInfo.hh"
42 #include "XrdPfcIOFile.hh"
43 #include "XrdPfcIOFileBlock.hh"
44 #include "XrdPfcResourceMonitor.hh"
45 
46 using namespace XrdPfc;
47 
48 Cache *Cache::m_instance = nullptr;
49 XrdScheduler *Cache::schedP = nullptr;
50 
51 
53 {
55  return 0;
56 }
57 
59 {
61  return 0;
62 }
63 
64 void *PrefetchThread(void*)
65 {
67  return 0;
68 }
69 
70 //==============================================================================
71 
72 extern "C"
73 {
75  const char *config_filename,
76  const char *parameters,
77  XrdOucEnv *env)
78 {
79  XrdSysError err(logger, "");
80  err.Say("++++++ Proxy file cache initialization started.");
81 
82  if ( ! env ||
83  ! (XrdPfc::Cache::schedP = (XrdScheduler*) env->GetPtr("XrdScheduler*")))
84  {
87  }
88 
89  Cache &instance = Cache::CreateInstance(logger, env);
90 
91  if (! instance.Config(config_filename, parameters))
92  {
93  err.Say("Config Proxy file cache initialization failed.");
94  return 0;
95  }
96  err.Say("++++++ Proxy file cache initialization completed.");
97 
98  {
99  pthread_t tid;
100 
101  XrdSysThread::Run(&tid, ResourceMonitorThread, 0, 0, "XrdPfc ResourceMonitor");
102 
103  for (int wti = 0; wti < instance.RefConfiguration().m_wqueue_threads; ++wti)
104  {
105  XrdSysThread::Run(&tid, ProcessWriteTaskThread, 0, 0, "XrdPfc WriteTasks ");
106  }
107 
108  if (instance.RefConfiguration().m_prefetch_max_blocks > 0)
109  {
110  XrdSysThread::Run(&tid, PrefetchThread, 0, 0, "XrdPfc Prefetch ");
111  }
112  }
113 
114  XrdPfcFSctl* pfcFSctl = new XrdPfcFSctl(instance, logger);
115  env->PutPtr("XrdFSCtl_PC*", pfcFSctl);
116 
117  return &instance;
118 }
119 }
120 
121 //==============================================================================
122 
124 {
125  assert (m_instance == 0);
126  m_instance = new Cache(logger, env);
127  return *m_instance;
128 }
129 
130  Cache& Cache::GetInstance() { return *m_instance; }
131 const Cache& Cache::TheOne() { return *m_instance; }
132 const Configuration& Cache::Conf() { return m_instance->RefConfiguration(); }
133  ResourceMonitor& Cache::ResMon() { return m_instance->RefResMon(); }
134 
136 {
137  if (! m_decisionpoints.empty())
138  {
139  XrdCl::URL url(io->Path());
140  std::string filename = url.GetPath();
141  std::vector<Decision*>::const_iterator it;
142  for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
143  {
144  XrdPfc::Decision *d = *it;
145  if (! d) continue;
146  if (! d->Decide(filename, *m_oss))
147  {
148  return false;
149  }
150  }
151  }
152 
153  return true;
154 }
155 
157  XrdOucCache("pfc"),
158  m_env(env),
159  m_log(logger, "XrdPfc_"),
160  m_trace(new XrdSysTrace("XrdPfc", logger)),
161  m_traceID("Cache"),
162  m_oss(0),
163  m_gstream(0),
164  m_purge_pin(0),
165  m_prefetch_condVar(0),
166  m_prefetch_enabled(false),
167  m_RAM_used(0),
168  m_RAM_write_queue(0),
169  m_RAM_std_size(0),
170  m_isClient(false),
171  m_active_cond(0)
172 {
173  // Default log level is Warning.
174  m_trace->What = 2;
175 }
176 
178 {
179  const char* tpfx = "Attach() ";
180 
181  if (Cache::GetInstance().Decide(io))
182  {
183  TRACE(Info, tpfx << obfuscateAuth(io->Path()));
184 
185  IO *cio;
186 
187  if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
188  {
189  cio = new IOFileBlock(io, *this);
190  }
191  else
192  {
193  IOFile *iof = new IOFile(io, *this);
194 
195  if ( ! iof->HasFile())
196  {
197  delete iof;
198  // TODO - redirect instead. But this is kind of an awkward place for it.
199  // errno is set during IOFile construction.
200  TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
201  return io;
202  }
203 
204  cio = iof;
205  }
206 
207  TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
208  ((loc && loc[0] != 0) ? loc : "<deferred open>"));
209 
210  return cio;
211  }
212  else
213  {
214  TRACE(Info, tpfx << "decision decline " << io->Path());
215  }
216  return io;
217 }
218 
219 void Cache::AddWriteTask(Block* b, bool fromRead)
220 {
221  TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
222 
223  {
224  XrdSysMutexHelper lock(&m_RAM_mutex);
225  m_RAM_write_queue += b->get_size();
226  }
227 
228  m_writeQ.condVar.Lock();
229  if (fromRead)
230  m_writeQ.queue.push_back(b);
231  else
232  m_writeQ.queue.push_front(b);
233  m_writeQ.size++;
234  m_writeQ.condVar.Signal();
235  m_writeQ.condVar.UnLock();
236 }
237 
239 {
240  std::list<Block*> removed_blocks;
241  long long sum_size = 0;
242 
243  m_writeQ.condVar.Lock();
244  std::list<Block*>::iterator i = m_writeQ.queue.begin();
245  while (i != m_writeQ.queue.end())
246  {
247  if ((*i)->m_file == file)
248  {
249  TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
250  std::list<Block*>::iterator j = i++;
251  removed_blocks.push_back(*j);
252  sum_size += (*j)->get_size();
253  m_writeQ.queue.erase(j);
254  --m_writeQ.size;
255  }
256  else
257  {
258  ++i;
259  }
260  }
261  m_writeQ.condVar.UnLock();
262 
263  {
264  XrdSysMutexHelper lock(&m_RAM_mutex);
265  m_RAM_write_queue -= sum_size;
266  }
267 
268  file->BlocksRemovedFromWriteQ(removed_blocks);
269 }
270 
272 {
273  std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
274 
275  while (true)
276  {
277  m_writeQ.condVar.Lock();
278  while (m_writeQ.size == 0)
279  {
280  m_writeQ.condVar.Wait();
281  }
282 
283  // MT -- optimize to pop several blocks if they are available (or swap the list).
284  // This makes sense especially for smallish block sizes.
285 
286  int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
287  long long sum_size = 0;
288 
289  for (int bi = 0; bi < n_pushed; ++bi)
290  {
291  Block* block = m_writeQ.queue.front();
292  m_writeQ.queue.pop_front();
293  m_writeQ.writes_between_purges += block->get_size();
294  sum_size += block->get_size();
295 
296  blks_to_write[bi] = block;
297 
298  TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
299  }
300  m_writeQ.size -= n_pushed;
301 
302  m_writeQ.condVar.UnLock();
303 
304  {
305  XrdSysMutexHelper lock(&m_RAM_mutex);
306  m_RAM_write_queue -= sum_size;
307  }
308 
309  for (int bi = 0; bi < n_pushed; ++bi)
310  {
311  Block* block = blks_to_write[bi];
312 
313  block->m_file->WriteBlockToDisk(block);
314  }
315  }
316 }
317 
319 {
320  // Called from ResourceMonitor for an alternative estimation of disk writes.
321  XrdSysCondVarHelper lock(&m_writeQ.condVar);
322  long long ret = m_writeQ.writes_between_purges;
323  m_writeQ.writes_between_purges = 0;
324  return ret;
325 }
326 
327 //==============================================================================
328 
329 char* Cache::RequestRAM(long long size)
330 {
331  static const size_t s_block_align = sysconf(_SC_PAGESIZE);
332 
333  bool std_size = (size == m_configuration.m_bufferSize);
334 
335  m_RAM_mutex.Lock();
336 
337  long long total = m_RAM_used + size;
338 
339  if (total <= m_configuration.m_RamAbsAvailable)
340  {
341  m_RAM_used = total;
342  if (std_size && m_RAM_std_size > 0)
343  {
344  char *buf = m_RAM_std_blocks.back();
345  m_RAM_std_blocks.pop_back();
346  --m_RAM_std_size;
347 
348  m_RAM_mutex.UnLock();
349 
350  return buf;
351  }
352  else
353  {
354  m_RAM_mutex.UnLock();
355  char *buf;
356  if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
357  {
358  // Report out of mem? Probably should report it at least the first time,
359  // then periodically.
360  return 0;
361  }
362  return buf;
363  }
364  }
365  m_RAM_mutex.UnLock();
366  return 0;
367 }
368 
369 void Cache::ReleaseRAM(char* buf, long long size)
370 {
371  bool std_size = (size == m_configuration.m_bufferSize);
372  {
373  XrdSysMutexHelper lock(&m_RAM_mutex);
374 
375  m_RAM_used -= size;
376 
377  if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
378  {
379  m_RAM_std_blocks.push_back(buf);
380  ++m_RAM_std_size;
381  return;
382  }
383  }
384  free(buf);
385 }
386 
387 File* Cache::GetFile(const std::string& path, IO* io, long long off, long long filesize)
388 {
389  // Called from virtual IO::Attach
390 
391  TRACE(Debug, "GetFile " << path << ", io " << io);
392 
393  ActiveMap_i it;
394 
395  {
396  XrdSysCondVarHelper lock(&m_active_cond);
397 
398  while (true)
399  {
400  it = m_active.find(path);
401 
402  // File is not open or being opened. Mark it as being opened and
403  // proceed to opening it outside of while loop.
404  if (it == m_active.end())
405  {
406  it = m_active.insert(std::make_pair(path, (File*) 0)).first;
407  break;
408  }
409 
410  if (it->second != 0)
411  {
412  it->second->AddIO(io);
413  inc_ref_cnt(it->second, false, true);
414 
415  return it->second;
416  }
417  else
418  {
419  // Wait for some change in m_active, then recheck.
420  m_active_cond.Wait();
421  }
422  }
423  }
424 
425  if (filesize == 0)
426  {
427  struct stat st;
428  int res = io->Fstat(st);
429  if (res < 0) {
430  errno = res;
431  TRACE(Error, "GetFile, could not get valid stat");
432  } else if (res > 0) {
433  errno = ENOTSUP;
434  TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
435  } else {
436  filesize = st.st_size;
437  }
438  }
439 
440  File *file = 0;
441 
442  if (filesize >= 0)
443  {
444  file = File::FileOpen(path, off, filesize);
445  }
446 
447  {
448  XrdSysCondVarHelper lock(&m_active_cond);
449 
450  if (file)
451  {
452  inc_ref_cnt(file, false, true);
453  it->second = file;
454 
455  file->AddIO(io);
456  }
457  else
458  {
459  m_active.erase(it);
460  }
461 
462  m_active_cond.Broadcast();
463  }
464 
465  return file;
466 }
467 
469 {
470  // Called from virtual IO::DetachFinalize.
471 
472  TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
473 
474  {
475  XrdSysCondVarHelper lock(&m_active_cond);
476 
477  f->RemoveIO(io);
478  }
479  dec_ref_cnt(f, true);
480 }
481 
482 
483 //==============================================================================
484 //==============================================================================
485 
486 namespace
487 {
488 
489 class DiskSyncer : public XrdJob
490 {
491 private:
492  File *m_file;
493  bool m_high_debug;
494 
495 public:
496  DiskSyncer(File *f, bool high_debug, const char *desc = "") :
497  XrdJob(desc),
498  m_file(f),
499  m_high_debug(high_debug)
500  {}
501 
502  void DoIt()
503  {
504  m_file->Sync();
505  Cache::GetInstance().FileSyncDone(m_file, m_high_debug);
506  delete this;
507  }
508 };
509 
510 
511 class CommandExecutor : public XrdJob
512 {
513 private:
514  std::string m_command_url;
515 
516 public:
517  CommandExecutor(const std::string& command, const char *desc = "") :
518  XrdJob(desc),
519  m_command_url(command)
520  {}
521 
522  void DoIt()
523  {
524  Cache::GetInstance().ExecuteCommandUrl(m_command_url);
525  delete this;
526  }
527 };
528 
529 }
530 
531 //==============================================================================
532 
533 void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set, bool high_debug)
534 {
535  DiskSyncer* ds = new DiskSyncer(f, high_debug);
536 
537  if ( ! ref_cnt_already_set) inc_ref_cnt(f, true, high_debug);
538 
539  schedP->Schedule(ds);
540 }
541 
542 void Cache::FileSyncDone(File* f, bool high_debug)
543 {
544  dec_ref_cnt(f, high_debug);
545 }
546 
547 void Cache::inc_ref_cnt(File* f, bool lock, bool high_debug)
548 {
549  // called from GetFile() or SheduleFileSync();
550 
551  int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
552 
553  if (lock) m_active_cond.Lock();
554  int rc = f->inc_ref_cnt();
555  if (lock) m_active_cond.UnLock();
556 
557  TRACE_INT(tlvl, "inc_ref_cnt " << f->GetLocalPath() << ", cnt at exit = " << rc);
558 }
559 
560 void Cache::dec_ref_cnt(File* f, bool high_debug)
561 {
562  // Called from ReleaseFile() or DiskSync callback.
563 
564  int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
565  int cnt;
566 
567  {
568  XrdSysCondVarHelper lock(&m_active_cond);
569 
570  cnt = f->get_ref_cnt();
571 
572  if (f->is_in_emergency_shutdown())
573  {
574  // In this case file has been already removed from m_active map and
575  // does not need to be synced.
576 
577  if (cnt == 1)
578  {
579  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
580  << " -- deleting File object without further ado");
581  delete f;
582  }
583  else
584  {
585  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
586  << " -- waiting");
587  }
588 
589  return;
590  }
591  }
592 
593  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt);
594 
595  if (cnt == 1)
596  {
597  if (f->FinalizeSyncBeforeExit())
598  {
599  // Note, here we "reuse" the existing reference count for the
600  // final sync.
601 
602  TRACE(Debug, "dec_ref_cnt " << f->GetLocalPath() << ", scheduling final sync");
603  schedule_file_sync(f, true, true);
604  return;
605  }
606  }
607 
608  bool finished_p = false;
609  {
610  XrdSysCondVarHelper lock(&m_active_cond);
611 
612  cnt = f->dec_ref_cnt();
613  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
614  if (cnt == 0)
615  {
616  ActiveMap_i it = m_active.find(f->GetLocalPath());
617  m_active.erase(it);
618 
619  finished_p = true;
620  }
621  }
622 
623  if (finished_p)
624  {
625  if (m_gstream)
626  {
627  const Stats &st = f->RefStats();
628  const Info::AStat *as = f->GetLastAccessStats();
629 
630  char buf[4096];
631  int len = snprintf(buf, 4096, "{\"event\":\"file_close\","
632  "\"lfn\":\"%s\",\"size\":%lld,\"blk_size\":%d,\"n_blks\":%d,\"n_blks_done\":%d,"
633  "\"access_cnt\":%lu,\"attach_t\":%lld,\"detach_t\":%lld,\"remotes\":%s,"
634  "\"b_hit\":%lld,\"b_miss\":%lld,\"b_bypass\":%lld,\"b_write\":%lld,\"n_cks_errs\":%d}",
635  f->GetLocalPath().c_str(), f->GetFileSize(), f->GetBlockSize(),
636  f->GetNBlocks(), f->GetNDownloadedBlocks(),
637  (unsigned long) f->GetAccessCnt(), (long long) as->AttachTime, (long long) as->DetachTime,
638  f->GetRemoteLocations().c_str(),
640  );
641  bool suc = false;
642  if (len < 4096)
643  {
644  suc = m_gstream->Insert(buf, len + 1);
645  }
646  if ( ! suc)
647  {
648  TRACE(Error, "Failed g-stream insertion of file_close record, len=" << len);
649  }
650  }
651 
652  delete f;
653  }
654 }
655 
656 bool Cache::IsFileActiveOrPurgeProtected(const std::string& path) const
657 {
658  XrdSysCondVarHelper lock(&m_active_cond);
659 
660  return m_active.find(path) != m_active.end() ||
661  m_purge_delay_set.find(path) != m_purge_delay_set.end();
662 }
663 
665 {
666  XrdSysCondVarHelper lock(&m_active_cond);
667  m_purge_delay_set.clear();
668 }
669 
670 //==============================================================================
671 //=== PREFETCH
672 //==============================================================================
673 
675 {
676  // Can be called with other locks held.
677 
678  if ( ! m_prefetch_enabled)
679  {
680  return;
681  }
682 
683  m_prefetch_condVar.Lock();
684  m_prefetchList.push_back(file);
685  m_prefetch_condVar.Signal();
686  m_prefetch_condVar.UnLock();
687 }
688 
689 
691 {
692  // Can be called with other locks held.
693 
694  if ( ! m_prefetch_enabled)
695  {
696  return;
697  }
698 
699  m_prefetch_condVar.Lock();
700  for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
701  {
702  if (*it == file)
703  {
704  m_prefetchList.erase(it);
705  break;
706  }
707  }
708  m_prefetch_condVar.UnLock();
709 }
710 
711 
713 {
714  m_prefetch_condVar.Lock();
715  while (m_prefetchList.empty())
716  {
717  m_prefetch_condVar.Wait();
718  }
719 
720  // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
721 
722  size_t l = m_prefetchList.size();
723  int idx = rand() % l;
724  File* f = m_prefetchList[idx];
725 
726  m_prefetch_condVar.UnLock();
727  return f;
728 }
729 
730 
732 {
733  const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
734 
735  while (true)
736  {
737  m_RAM_mutex.Lock();
738  bool doPrefetch = (m_RAM_used < limit_RAM);
739  m_RAM_mutex.UnLock();
740 
741  if (doPrefetch)
742  {
744  f->Prefetch();
745  }
746  else
747  {
749  }
750  }
751 }
752 
753 
754 //==============================================================================
755 //=== Virtuals from XrdOucCache
756 //==============================================================================
757 
758 //------------------------------------------------------------------------------
772 
773 int Cache::LocalFilePath(const char *curl, char *buff, int blen,
774  LFP_Reason why, bool forall)
775 {
776  static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
777  static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
778  static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
779 
780  TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
781 
782  if (buff && blen > 0) buff[0] = 0;
783 
784  XrdCl::URL url(curl);
785  std::string f_name = url.GetPath();
786  std::string i_name = f_name + Info::s_infoExtension;
787 
788  if (why == ForPath)
789  {
790  int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
791  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
792  return ret;
793  }
794 
795  {
796  XrdSysCondVarHelper lock(&m_active_cond);
797  m_purge_delay_set.insert(f_name);
798  }
799 
800  struct stat sbuff, sbuff2;
801  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
802  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
803  {
804  if (S_ISDIR(sbuff.st_mode))
805  {
806  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
807  return -EISDIR;
808  }
809  else
810  {
811  bool read_ok = false;
812  bool is_complete = false;
813 
814  // Lock and check if the file is active. If NOT, keep the lock
815  // and add dummy access after successful reading of info file.
816  // If it IS active, just release the lock, this ongoing access will
817  // assure the file continues to exist.
818 
819  // XXXX How can I just loop over the cinfo file when active?
820  // Can I not get is_complete from the existing file?
821  // Do I still want to inject access record?
822  // Oh, it writes only if not active .... still let's try to use existing File.
823 
824  m_active_cond.Lock();
825 
826  bool is_active = m_active.find(f_name) != m_active.end();
827 
828  if (is_active) m_active_cond.UnLock();
829 
830  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
831  XrdOucEnv myEnv;
832  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
833  if (res >= 0)
834  {
835  Info info(m_trace, 0);
836  if (info.Read(infoFile, i_name.c_str()))
837  {
838  read_ok = true;
839 
840  is_complete = info.IsComplete();
841 
842  // Add full-size access if reason is for access.
843  if ( ! is_active && is_complete && why == ForAccess)
844  {
845  info.WriteIOStatSingle(info.GetFileSize());
846  info.Write(infoFile, i_name.c_str());
847  }
848  }
849  infoFile->Close();
850  }
851  delete infoFile;
852 
853  if ( ! is_active) m_active_cond.UnLock();
854 
855  if (read_ok)
856  {
857  if ((is_complete || why == ForInfo) && buff != 0)
858  {
859  int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
860  if (res2 < 0)
861  return res2;
862 
863  // Normally, files are owned by us but when direct cache access
864  // is wanted and possible, make sure the file is world readable.
865  if (why == ForAccess)
866  {mode_t mode = (forall ? worldReadable : groupReadable);
867  if (((sbuff.st_mode & worldReadable) != mode)
868  && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
869  {is_complete = false;
870  *buff = 0;
871  }
872  }
873  }
874 
875  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
876  (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
877 
878  return is_complete ? 0 : -EREMOTE;
879  }
880  }
881  }
882 
883  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
884  return -ENOENT;
885 }
886 
887 //______________________________________________________________________________
888 // Check if the file is cached including m_onlyIfCachedMinSize and m_onlyIfCachedMinFrac
889 // pfc configuration parameters. The logic of accessing the Info file is the same
890 // as in Cache::LocalFilePath.
900 //------------------------------------------------------------------------------
901 int Cache::ConsiderCached(const char *curl)
902 {
903  TRACE(Debug, "ConsiderFileCached '" << curl << "'" );
904 
905  XrdCl::URL url(curl);
906  std::string f_name = url.GetPath();
907  std::string i_name = f_name + Info::s_infoExtension;
908 
909  {
910  XrdSysCondVarHelper lock(&m_active_cond);
911  m_purge_delay_set.insert(f_name);
912  }
913 
914  struct stat sbuff, sbuff2;
915  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
916  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
917  {
918  if (S_ISDIR(sbuff.st_mode))
919  {
920  TRACE(Info, "ConsiderCached '" << curl << ", why=ForInfo" << " -> EISDIR");
921  return -EISDIR;
922  }
923  else
924  {
925  bool read_ok = false;
926  bool is_cached = false;
927 
928  // Lock and check if the file is active. If NOT, keep the lock
929  // and add dummy access after successful reading of info file.
930  // If it IS active, just release the lock, this ongoing access will
931  // assure the file continues to exist.
932 
933  // XXXX How can I just loop over the cinfo file when active?
934  // Can I not get is_complete from the existing file?
935  // Do I still want to inject access record?
936  // Oh, it writes only if not active .... still let's try to use existing File.
937 
938  m_active_cond.Lock();
939 
940  bool is_active = m_active.find(f_name) != m_active.end();
941 
942  if (is_active)
943  m_active_cond.UnLock();
944 
945  XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
946  XrdOucEnv myEnv;
947  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
948  if (res >= 0)
949  {
950  Info info(m_trace, 0);
951  if (info.Read(infoFile, i_name.c_str()))
952  {
953  read_ok = true;
954 
955  if (info.IsComplete())
956  {
957  is_cached = true;
958  }
959  else if (info.GetFileSize() == 0)
960  {
961  is_cached = true;
962  }
963  else
964  {
965  long long fileSize = info.GetFileSize();
966  long long bytesRead = info.GetNDownloadedBytes();
967 
968  if (fileSize < m_configuration.m_onlyIfCachedMinSize)
969  {
970  if ((float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
971  is_cached = true;
972  }
973  else
974  {
975  if (bytesRead > m_configuration.m_onlyIfCachedMinSize &&
976  (float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
977  is_cached = true;
978  }
979  }
980  }
981  infoFile->Close();
982  }
983  delete infoFile;
984 
985  if (!is_active) m_active_cond.UnLock();
986 
987  if (read_ok)
988  {
989  TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << (is_cached ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
990  return is_cached ? 0 : -EREMOTE;
991  }
992  }
993  }
994 
995  TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << " -> ENOENT");
996  return -ENOENT;
997 }
998 
999 //______________________________________________________________________________
1007 //------------------------------------------------------------------------------
1008 
1009 int Cache::Prepare(const char *curl, int oflags, mode_t mode)
1010 {
1011  XrdCl::URL url(curl);
1012  std::string f_name = url.GetPath();
1013  std::string i_name = f_name + Info::s_infoExtension;
1014 
1015  // Do not allow write access.
1016  if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1017  {
1018  TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1019  return -EROFS;
1020  }
1021 
1022  // Intercept xrdpfc_command requests.
1023  if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1024  {
1025  // Schedule a job to process command request.
1026  {
1027  CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1028 
1029  schedP->Schedule(ce);
1030  }
1031 
1032  return -EAGAIN;
1033  }
1034 
1035  {
1036  XrdSysCondVarHelper lock(&m_active_cond);
1037  m_purge_delay_set.insert(f_name);
1038  }
1039 
1040  struct stat sbuff;
1041  int res = m_oss->Stat(i_name.c_str(), &sbuff);
1042  if (res == 0)
1043  {
1044  TRACE(Dump, "Prepare defer open " << f_name);
1045  return 1;
1046  }
1047  else
1048  {
1049  return 0;
1050  }
1051 }
1052 
1053 //______________________________________________________________________________
1054 // virtual method of XrdOucCache.
1059 //------------------------------------------------------------------------------
1060 
1061 int Cache::Stat(const char *curl, struct stat &sbuff)
1062 {
1063  XrdCl::URL url(curl);
1064  std::string f_name = url.GetPath();
1065 
1066  {
1067  XrdSysCondVarHelper lock(&m_active_cond);
1068  m_purge_delay_set.insert(f_name);
1069  }
1070 
1071  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK)
1072  {
1073  if (S_ISDIR(sbuff.st_mode))
1074  {
1075  return 0;
1076  }
1077  else
1078  {
1079  bool success = false;
1080  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
1081  XrdOucEnv myEnv;
1082 
1083  f_name += Info::s_infoExtension;
1084  int res = infoFile->Open(f_name.c_str(), O_RDONLY, 0600, myEnv);
1085  if (res >= 0)
1086  {
1087  Info info(m_trace, 0);
1088  if (info.Read(infoFile, f_name.c_str()))
1089  {
1090  sbuff.st_size = info.GetFileSize();
1091  success = true;
1092  }
1093  }
1094  infoFile->Close();
1095  delete infoFile;
1096  return success ? 0 : 1;
1097  }
1098  }
1099 
1100  return 1;
1101 }
1102 
1103 //______________________________________________________________________________
1104 // virtual method of XrdOucCache.
1108 //------------------------------------------------------------------------------
1109 
1110 int Cache::Unlink(const char *curl)
1111 {
1112  XrdCl::URL url(curl);
1113  std::string f_name = url.GetPath();
1114 
1115  // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1116 
1117  return UnlinkFile(f_name, false);
1118 }
1119 
1120 int Cache::UnlinkFile(const std::string& f_name, bool fail_if_open)
1121 {
1122  static const char* trc_pfx = "UnlinkFile ";
1123  ActiveMap_i it;
1124  File *file = 0;
1125  {
1126  XrdSysCondVarHelper lock(&m_active_cond);
1127 
1128  it = m_active.find(f_name);
1129 
1130  if (it != m_active.end())
1131  {
1132  if (fail_if_open)
1133  {
1134  TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1135  return -EBUSY;
1136  }
1137 
1138  // Null File* in m_active map means an operation is ongoing, probably
1139  // Attach() with possible File::Open(). Ask for retry.
1140  if (it->second == 0)
1141  {
1142  TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1143  return -EAGAIN;
1144  }
1145 
1146  file = it->second;
1148  it->second = 0;
1149  }
1150  else
1151  {
1152  it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1153  }
1154  }
1155 
1156  if (file)
1157  {
1158  RemoveWriteQEntriesFor(file);
1159  }
1160 
1161  std::string i_name = f_name + Info::s_infoExtension;
1162 
1163  // Unlink file & cinfo
1164  struct stat f_stat;
1165  bool stat_ok = (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK);
1166  int f_ret = m_oss->Unlink(f_name.c_str());
1167  int i_ret = m_oss->Unlink(i_name.c_str());
1168 
1169  if (stat_ok)
1170  m_res_mon->register_file_purge(f_name, f_stat.st_blocks);
1171 
1172  TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1173 
1174  {
1175  XrdSysCondVarHelper lock(&m_active_cond);
1176 
1177  m_active.erase(it);
1178  }
1179 
1180  return std::min(f_ret, i_ret);
1181 }
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
Definition: XrdAccTest.cc:262
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
#define XrdOssOK
Definition: XrdOss.hh:50
std::string obfuscateAuth(const std::string &input)
#define TRACE_Dump
Definition: XrdPfcTrace.hh:11
#define TRACE_PC(act, pre_code, x)
Definition: XrdPfcTrace.hh:55
#define TRACE_INT(act, x)
Definition: XrdPfcTrace.hh:47
void * ResourceMonitorThread(void *)
Definition: XrdPfc.cc:52
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition: XrdPfc.cc:74
void * PrefetchThread(void *)
Definition: XrdPfc.cc:64
void * ProcessWriteTaskThread(void *)
Definition: XrdPfc.cc:58
int stat(const char *path, struct stat *buf)
@ Warning
#define TRACE(act, x)
Definition: XrdTrace.hh:63
URL representation.
Definition: XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
Definition: XrdJob.hh:43
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:873
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
virtual const char * Path()=0
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
virtual const char * Location(bool refresh=false)
Definition: XrdOucCache.hh:161
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:263
void PutPtr(const char *varname, void *value)
Definition: XrdOucEnv.cc:298
int get_size() const
Definition: XrdPfcFile.hh:141
File * get_file() const
Definition: XrdPfcFile.hh:145
long long m_offset
Definition: XrdPfcFile.hh:119
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:151
void FileSyncDone(File *, bool high_debug)
Definition: XrdPfc.cc:542
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition: XrdPfc.cc:387
static const Configuration & Conf()
Definition: XrdPfc.cc:132
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
Definition: XrdPfc.cc:773
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1061
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:199
static ResourceMonitor & ResMon()
Definition: XrdPfc.cc:133
bool IsFileActiveOrPurgeProtected(const std::string &) const
Definition: XrdPfc.cc:656
void ClearPurgeProtectedSet()
Definition: XrdPfc.cc:664
void ReleaseRAM(char *buf, long long size)
Definition: XrdPfc.cc:369
virtual int ConsiderCached(const char *url)
Definition: XrdPfc.cc:901
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:130
void DeRegisterPrefetchFile(File *)
Definition: XrdPfc.cc:690
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
Definition: XrdPfc.cc:674
void Prefetch()
Definition: XrdPfc.cc:731
void ReleaseFile(File *, IO *)
Definition: XrdPfc.cc:468
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Definition: XrdPfc.cc:219
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition: XrdPfc.cc:156
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition: XrdPfc.cc:135
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1120
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *ioP, int opts=0)=0
Obtain a new IO object that fronts existing XrdOucCacheIO.
static XrdScheduler * schedP
Definition: XrdPfc.hh:285
File * GetNextFileToPrefetch()
Definition: XrdPfc.cc:712
ResourceMonitor & RefResMon()
Definition: XrdPfc.hh:280
long long WritesSinceLastCall()
Definition: XrdPfc.cc:318
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
Definition: XrdPfc.cc:271
virtual int Unlink(const char *url)
Definition: XrdPfc.cc:1110
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition: XrdPfc.cc:238
static const Cache & TheOne()
Definition: XrdPfc.cc:131
char * RequestRAM(long long size)
Definition: XrdPfc.cc:329
virtual int Prepare(const char *url, int oflags, mode_t mode)
Definition: XrdPfc.cc:1009
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
Definition: XrdPfc.cc:123
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:286
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1470
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:1008
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:108
int GetNBlocks() const
Definition: XrdPfcFile.hh:285
void Prefetch()
Definition: XrdPfcFile.cc:1485
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1582
const Info::AStat * GetLastAccessStats() const
Definition: XrdPfcFile.hh:282
size_t GetAccessCnt() const
Definition: XrdPfcFile.hh:283
void AddIO(IO *io)
Definition: XrdPfcFile.cc:310
int GetBlockSize() const
Definition: XrdPfcFile.hh:284
int GetNDownloadedBlocks() const
Definition: XrdPfcFile.hh:286
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:180
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:121
int inc_ref_cnt()
Definition: XrdPfcFile.hh:291
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1091
int dec_ref_cnt()
Definition: XrdPfcFile.hh:292
int get_ref_cnt()
Definition: XrdPfcFile.hh:290
long long GetFileSize() const
Definition: XrdPfcFile.hh:274
const Stats & RefStats() const
Definition: XrdPfcFile.hh:287
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:347
const std::string & GetLocalPath() const
Definition: XrdPfcFile.hh:269
bool is_in_emergency_shutdown()
Definition: XrdPfcFile.hh:295
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:39
bool HasFile() const
Check if File was opened successfully.
Definition: XrdPfcIOFile.hh:48
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
Definition: XrdPfcInfo.cc:446
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:268
long long GetNDownloadedBytes() const
Get number of downloaded bytes.
Definition: XrdPfcInfo.hh:407
bool IsComplete() const
Get complete status.
Definition: XrdPfcInfo.hh:447
long long GetFileSize() const
Get file size.
Definition: XrdPfcInfo.hh:442
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:296
void register_file_purge(DirState *target, long long size_in_st_blocks)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
long long m_BytesMissed
number of bytes served from remote and cached
Definition: XrdPfcStats.hh:40
long long m_BytesBypassed
number of bytes served directly through XrdCl
Definition: XrdPfcStats.hh:41
int m_NCksumErrors
number of checksum errors while getting data from remote
Definition: XrdPfcStats.hh:44
long long m_BytesHit
number of bytes served from disk
Definition: XrdPfcStats.hh:39
long long m_BytesWritten
number of bytes written to disk
Definition: XrdPfcStats.hh:42
void Schedule(XrdJob *jp)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
bool Insert(const char *data, int dlen)
Definition: XrdPfc.hh:41
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:108
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:85
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:112
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:109
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:107
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:110
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:87
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:122
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:121
Access statistics.
Definition: XrdPfcInfo.hh:57
time_t DetachTime
close time
Definition: XrdPfcInfo.hh:59
time_t AttachTime
open time
Definition: XrdPfcInfo.hh:58