XRootD
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

+ Inheritance diagram for XrdPfc::Cache:
+ Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor. More...
 
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue. More...
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *ioP, int opts=0)=0
 Obtain a new IO object that fronts existing XrdOucCacheIO. More...
 
void ClearPurgeProtectedSet ()
 
bool Config (const char *config_filename, const char *parameters)
 Parse configuration file. More...
 
virtual int ConsiderCached (const char *url)
 
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached. More...
 
void DeRegisterPrefetchFile (File *)
 
void ExecuteCommandUrl (const std::string &command_url)
 
void FileSyncDone (File *, bool high_debug)
 
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
 
XrdXrootdGStreamGetGStream ()
 
XrdSysErrorGetLog ()
 
FileGetNextFileToPrefetch ()
 
XrdOssGetOss () const
 
PurgePinGetPurgePin () const
 
XrdSysTraceGetTrace ()
 
bool IsFileActiveOrPurgeProtected (const std::string &) const
 
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
 
void Prefetch ()
 
virtual int Prepare (const char *url, int oflags, mode_t mode)
 
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk. More...
 
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration. More...
 
ResourceMonitorRefResMon ()
 
void RegisterPrefetchFile (File *)
 
void ReleaseFile (File *, IO *)
 
void ReleaseRAM (char *buf, long long size)
 
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction. More...
 
char * RequestRAM (long long size)
 
void ScheduleFileSync (File *f)
 
virtual int Stat (const char *url, struct stat &sbuff)
 
virtual int Unlink (const char *url)
 
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache. More...
 
long long WritesSinceLastCall ()
 
- Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
 
virtual ~XrdOucCache ()
 Destructor. More...
 
virtual int Rename (const char *oldp, const char *newp)
 
virtual int Rmdir (const char *dirp)
 
virtual int Truncate (const char *path, off_t size)
 
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)
 

Static Public Member Functions

static const ConfigurationConf ()
 
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation. More...
 
static CacheGetInstance ()
 Singleton access. More...
 
static ResourceMonitorResMon ()
 
static const CacheTheOne ()
 
static bool VCheck (XrdVersionInfo &urVersion)
 Version check. More...
 

Static Public Attributes

static XrdSchedulerschedP = nullptr
 
- Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file) More...
 
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write) More...
 
static const int optRW = 0x0004
 File is read/write (o/w read/only) More...
 
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache. More...
 

Additional Inherited Members

- Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
 
enum  XeqCmd { xeqNoop = 0 }
 
- Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc). More...
 
XrdOucCacheStats Statistics
 

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 150 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger logger,
XrdOucEnv env 
)

Constructor.

Definition at line 156 of file XrdPfc.cc.

156  :
157  XrdOucCache("pfc"),
158  m_env(env),
159  m_log(logger, "XrdPfc_"),
160  m_trace(new XrdSysTrace("XrdPfc", logger)),
161  m_traceID("Cache"),
162  m_oss(0),
163  m_gstream(0),
164  m_purge_pin(0),
165  m_prefetch_condVar(0),
166  m_prefetch_enabled(false),
167  m_RAM_used(0),
168  m_RAM_write_queue(0),
169  m_RAM_std_size(0),
170  m_isClient(false),
171  m_active_cond(0)
172 {
173  // Default log level is Warning.
174  m_trace->What = 2;
175 }
XrdOucCache(const char *ctype)
Definition: XrdOucCache.hh:700

References XrdSysTrace::What.

Referenced by CreateInstance().

+ Here is the caller graph for this function:

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block b,
bool  from_read 
)

Add downloaded block in write queue.

Definition at line 219 of file XrdPfc.cc.

220 {
221  TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
222 
223  {
224  XrdSysMutexHelper lock(&m_RAM_mutex);
225  m_RAM_write_queue += b->get_size();
226  }
227 
228  m_writeQ.condVar.Lock();
229  if (fromRead)
230  m_writeQ.queue.push_back(b);
231  else
232  m_writeQ.queue.push_front(b);
233  m_writeQ.size++;
234  m_writeQ.condVar.Signal();
235  m_writeQ.condVar.UnLock();
236 }
#define TRACE(act, x)
Definition: XrdTrace.hh:63
int get_size() const
Definition: XrdPfcFile.hh:141
File * get_file() const
Definition: XrdPfcFile.hh:145
long long m_offset
Definition: XrdPfcFile.hh:119
const std::string & GetLocalPath() const
Definition: XrdPfcFile.hh:269

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdPfc::Block::m_offset, and TRACE.

+ Here is the call graph for this function:

◆ Attach() [1/2]

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO io,
int  Options = 0 
)
virtual

Implements XrdOucCache.

Definition at line 177 of file XrdPfc.cc.

178 {
179  const char* tpfx = "Attach() ";
180 
181  if (Cache::GetInstance().Decide(io))
182  {
183  TRACE(Info, tpfx << obfuscateAuth(io->Path()));
184 
185  IO *cio;
186 
187  if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
188  {
189  cio = new IOFileBlock(io, *this);
190  }
191  else
192  {
193  IOFile *iof = new IOFile(io, *this);
194 
195  if ( ! iof->HasFile())
196  {
197  delete iof;
198  // TODO - redirect instead. But this is kind of an awkward place for it.
199  // errno is set during IOFile construction.
200  TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
201  return io;
202  }
203 
204  cio = iof;
205  }
206 
207  TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
208  ((loc && loc[0] != 0) ? loc : "<deferred open>"));
209 
210  return cio;
211  }
212  else
213  {
214  TRACE(Info, tpfx << "decision decline " << io->Path());
215  }
216  return io;
217 }
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
Definition: XrdPfcTrace.hh:55
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
Definition: XrdOucCache.hh:161
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:199
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:130
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition: XrdPfc.cc:135
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:39
bool HasFile() const
Check if File was opened successfully.
Definition: XrdPfcIOFile.hh:48
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41

References Macaroons::Debug, Decide(), Macaroons::Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

+ Here is the call graph for this function:

◆ Attach() [2/2]

virtual XrdOucCacheIO* XrdOucCache::Attach

Obtain a new IO object that fronts existing XrdOucCacheIO.

◆ ClearPurgeProtectedSet()

void Cache::ClearPurgeProtectedSet ( )

Definition at line 664 of file XrdPfc.cc.

665 {
666  XrdSysCondVarHelper lock(&m_active_cond);
667  m_purge_delay_set.clear();
668 }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), and XrdPfc::ResourceMonitor::perform_purge_task_cleanup().

+ Here is the caller graph for this function:

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 132 of file XrdPfc.cc.

132 { return m_instance->RefConfiguration(); }

References RefConfiguration().

Referenced by XrdPfc::FPurgeState::CheckFile(), XrdPfc::ResourceMonitor::heart_beat(), XrdPfc::OldStylePurgeDriver(), XrdPfc::ResourceMonitor::perform_purge_check(), Proto_ResourceMonitorHeartBeat(), and XrdPfc::ResourceMonitor::update_vs_and_file_usage_info().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char *  config_filename,
const char *  parameters 
)

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
Returns
parse status

Definition at line 329 of file XrdPfcConfiguration.cc.

330 {
331  // Indicate whether or not we are a client instance
332  const char *theINS = getenv("XRDINSTANCE");
333  m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
334 
335  // Tell everyone else we are a caching proxy
336  XrdOucEnv::Export("XRDPFC", 1);
337 
338  XrdOucEnv myEnv;
339  XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
340 
341  if (! config_filename || ! *config_filename)
342  {
343  TRACE(Error, "Config() configuration file not specified.");
344  return false;
345  }
346 
347  int fd;
348  if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
349  {
350  TRACE( Error, "Config() can't open configuration file " << config_filename);
351  return false;
352  }
353 
354  Config.Attach(fd);
355  static const char *cvec[] = { "*** pfc plugin config:", 0 };
356  Config.Capture(cvec);
357 
358  // Obtain OFS configurator for OSS plugin.
359  XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
360  &XrdVERSIONINFOVAR(XrdOucGetCache));
361  if (! ofsCfg) return false;
362 
363  TmpConfiguration tmpc;
364 
365  // Adjust default parameters for client/serverless caching
366  if (m_isClient)
367  {
368  m_configuration.m_bufferSize = 128 * 1024; // same as normal.
369  m_configuration.m_wqueue_blocks = 8;
370  m_configuration.m_wqueue_threads = 1;
371  }
372 
373  // If network checksum processing is the default, indicate so.
374  if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
375 
376  // Actual parsing of the config file.
377  bool retval = true, aOK = true;
378  char *var;
379  while ((var = Config.GetMyFirstWord()))
380  {
381  if (! strcmp(var,"pfc.osslib"))
382  {
383  retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
384  }
385  else if (! strcmp(var,"pfc.cschk"))
386  {
387  retval = xcschk(Config);
388  }
389  else if (! strcmp(var,"pfc.decisionlib"))
390  {
391  retval = xdlib(Config);
392  }
393  else if (! strcmp(var,"pfc.purgelib"))
394  {
395  retval = xplib(Config);
396  }
397  else if (! strcmp(var,"pfc.trace"))
398  {
399  retval = xtrace(Config);
400  }
401  else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
402  {
403  m_configuration.m_allow_xrdpfc_command = true;
404  }
405  else if (! strncmp(var,"pfc.", 4))
406  {
407  retval = ConfigParameters(std::string(var+4), Config, tmpc);
408  }
409 
410  if ( ! retval)
411  {
412  TRACE(Error, "Config() error in parsing");
413  aOK = false;
414  }
415  }
416 
417  Config.Close();
418 
419  // Load OSS plugin.
420  myEnv.Put("oss.runmode", "pfc");
421  if (m_configuration.is_cschk_cache())
422  {
423  char csi_conf[128];
424  if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
425  {
426  ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
427  } else {
428  TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
429  return false;
430  }
431  }
432  if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, &myEnv))
433  {
434  ofsCfg->Plugin(m_oss);
435  }
436  else
437  {
438  TRACE(Error, "Config() Unable to create an OSS object");
439  return false;
440  }
441 
442  // sets default value for disk usage
443  XrdOssVSInfo sP;
444  {
445  if (m_configuration.m_meta_space != m_configuration.m_data_space &&
446  m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
447  {
448  m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
449  return false;
450  }
451  if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
452  {
453  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
454  m_configuration.m_meta_space.c_str());
455  return false;
456  }
457  if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
458  {
459  m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
460  return false;
461  }
462  if (sP.Total < 10ll << 20)
463  {
464  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
465  m_configuration.m_data_space.c_str());
466  return false;
467  }
468 
469  m_configuration.m_diskTotalSpace = sP.Total;
470 
471  if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
472  cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
473  {
474  if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
475  m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
476  aOK = false;
477  }
478  }
479  else aOK = false;
480 
481  if ( ! tmpc.m_fileUsageMax.empty())
482  {
483  if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
484  cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
485  cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
486  {
487  if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
488  m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
489  m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
490  {
491  m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
492  aOK = false;
493  }
494 
495 
496  if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
497  {
498  m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
499  aOK = false;
500  }
501  }
502  else aOK = false;
503  }
504  }
505  // sets flush frequency
506  if ( ! tmpc.m_flushRaw.empty())
507  {
508  if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
509  {
510  if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
511  &m_configuration.m_flushCnt,
512  100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
513  {
514  return false;
515  }
516  m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
517  }
518  else
519  {
520  if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
521  &m_configuration.m_flushCnt, 100, 100000))
522  {
523  return false;
524  }
525  }
526  }
527 
528  // get number of available RAM blocks after process configuration
529  if (m_configuration.m_RamAbsAvailable == 0)
530  {
531  m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
532  char buff[1024];
533  snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
534  m_log.Say("Config info: ", buff);
535  }
536  // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
537  m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
538 
539 
540  // Set tracing to debug if this is set in environment
541  char* cenv = getenv("XRDDEBUG");
542  if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
543 
544  if (aOK)
545  {
546  int loff = 0;
547 // 000 001 010
548  const char *csc[] = {"off", "cache nonet", "nocache net notls",
549 // 011
550  "cache net notls",
551 // 100 101 110
552  "off", "cache nonet", "nocache net tls",
553 // 111
554  "cache net tls"};
555  char buff[8192], uvk[32];
556  if (m_configuration.m_cs_UVKeep < 0)
557  strcpy(uvk, "lru");
558  else
559  sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
560  float rg = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
561  loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
562  " pfc.cschk %s uvkeep %s\n"
563  " pfc.blocksize %lld\n"
564  " pfc.prefetch %d\n"
565  " pfc.ram %.fg\n"
566  " pfc.writequeue %d %d\n"
567  " # Total available disk: %lld\n"
568  " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
569  " pfc.spaces %s %s\n"
570  " pfc.trace %d\n"
571  " pfc.flush %lld\n"
572  " pfc.acchistorysize %d\n"
573  " pfc.onlyIfCachedMinBytes %lld\n"
574  " pfc.onlyIfCachedMinFrac %.2f\n",
575  config_filename,
576  csc[int(m_configuration.m_cs_Chk)], uvk,
577  m_configuration.m_bufferSize,
578  m_configuration.m_prefetch_max_blocks,
579  rg,
580  m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
581  sP.Total,
582  m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
583  m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
584  m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
585  m_configuration.m_data_space.c_str(),
586  m_configuration.m_meta_space.c_str(),
587  m_trace->What,
588  m_configuration.m_flushCnt,
589  m_configuration.m_accHistorySize,
590  m_configuration.m_onlyIfCachedMinSize,
591  m_configuration.m_onlyIfCachedMinFrac);
592 
593  if (m_configuration.is_dir_stat_reporting_on())
594  {
595  loff += snprintf(buff + loff, sizeof(buff) - loff,
596  " pfc.dirstats maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
597  m_configuration.m_dirStatsMaxDepth, m_configuration.m_dirStatsStoreDepth,
598  (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
599  loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
600  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
601  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
602  loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
603  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
604  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
605  }
606 
607  if (m_configuration.m_hdfsmode)
608  {
609  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
610  }
611 
612  if (m_configuration.m_username.empty())
613  {
614  char unameBuff[256];
615  XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
616  m_configuration.m_username = unameBuff;
617  }
618  else
619  {
620  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
621  }
622 
623  m_log.Say(buff);
624 
625  m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
626  }
627 
628  // Derived settings
629  m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
630  Info::s_maxNumAccess = m_configuration.m_accHistorySize;
631 
632  m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
633 
634  m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
635 
636  // Create the ResourceMonitor and get it ready for starting the main thread function.
637  if (aOK)
638  {
639  m_res_mon = new ResourceMonitor(*m_oss);
640  m_res_mon->init_before_main();
641  }
642 
643  m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
644 
645  if (ofsCfg) delete ofsCfg;
646 
647  // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
648  // Building of xrdpfc_print fails when this is enabled.
649 #ifdef XRDPFC_CKSUM_TEST
650  {
651  int xxx = m_configuration.m_cs_Chk;
652 
653  for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
654  {
655  Info::TestCksumStuff();
656  }
657 
658  m_configuration.m_cs_Chk = xxx;
659  }
660 #endif
661 
662  return aOK;
663 }
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition: XrdPfc.cc:74
int open(const char *path, int oflag,...)
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition: XrdOssVS.hh:90
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
static int Export(const char *Var, const char *Val)
Definition: XrdOucEnv.cc:170
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:263
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
static size_t s_maxNumAccess
Definition: XrdPfcInfo.hh:311
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
@ CSChk_Both
Definition: XrdPfcTypes.hh:27
@ CSChk_None
Definition: XrdPfcTypes.hh:27
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition: XrdPfc.hh:114
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:108
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:115
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition: XrdPfc.hh:100
int m_wqueue_threads
number of threads writing blocks to disk
Definition: XrdPfc.hh:111
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition: XrdPfc.hh:91
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:96
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:94
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition: XrdPfc.hh:105
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:85
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:93
bool is_cschk_cache() const
Definition: XrdPfc.hh:75
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition: XrdPfc.hh:103
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:112
bool m_cs_ChkTLS
Allow TLS.
Definition: XrdPfc.hh:119
long long m_fileUsageNominal
cache purge - files usage nominal
Definition: XrdPfc.hh:95
int m_cs_Chk
Checksum check.
Definition: XrdPfc.hh:118
bool m_hdfsmode
flag for enabling block-level operation
Definition: XrdPfc.hh:84
int m_purgeColdFilesAge
purge files older than this age
Definition: XrdPfc.hh:98
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:88
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition: XrdPfc.hh:102
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:92
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:109
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:107
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:89
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:110
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:87
bool is_cschk_net() const
Definition: XrdPfc.hh:76
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:122
time_t m_cs_UVKeep
unverified checksum cache keep
Definition: XrdPfc.hh:117
int m_dirStatsMaxDepth
maximum depth for statistics write out
Definition: XrdPfc.hh:104
int m_purgeInterval
sleep interval between cache purges
Definition: XrdPfc.hh:97
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:121
bool is_dir_stat_reporting_on() const
Definition: XrdPfc.hh:70
std::string m_diskUsageLWM
Definition: XrdPfc.hh:129
std::string m_diskUsageHWM
Definition: XrdPfc.hh:130
std::string m_fileUsageBaseline
Definition: XrdPfc.hh:131
std::string m_fileUsageNominal
Definition: XrdPfc.hh:132
std::string m_flushRaw
Definition: XrdPfc.hh:134
std::string m_fileUsageMax
Definition: XrdPfc.hh:133

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, XrdSysError::Emsg(), Macaroons::Error, XrdOucEnv::Export(), XrdOucEnv::GetPtr(), XrdPfc::ResourceMonitor::init_before_main(), XrdPfc::Configuration::is_cschk_cache(), XrdPfc::Configuration::is_cschk_net(), XrdPfc::Configuration::is_dir_stat_reporting_on(), XrdOfsConfigPI::Load(), XrdPfc::Configuration::m_accHistorySize, XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_cs_Chk, XrdPfc::Configuration::m_cs_ChkTLS, XrdPfc::Configuration::m_cs_UVKeep, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_dirStatsDirGlobs, XrdPfc::Configuration::m_dirStatsDirs, XrdPfc::Configuration::m_dirStatsMaxDepth, XrdPfc::Configuration::m_dirStatsStoreDepth, XrdPfc::Configuration::m_diskTotalSpace, XrdPfc::Configuration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::Configuration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::Configuration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::Configuration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::Configuration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::Configuration::m_flushCnt, XrdPfc::TmpConfiguration::m_flushRaw, XrdPfc::Configuration::m_hdfsbsize, XrdPfc::Configuration::m_hdfsmode, XrdPfc::Configuration::m_meta_space, XrdPfc::Configuration::m_onlyIfCachedMinFrac, XrdPfc::Configuration::m_onlyIfCachedMinSize, XrdPfc::Configuration::m_prefetch_max_blocks, XrdPfc::Configuration::m_purgeColdFilesAge, XrdPfc::Configuration::m_purgeInterval, XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::Configuration::m_RamKeepStdBlocks, XrdPfc::Configuration::m_username, XrdPfc::Configuration::m_wqueue_blocks, XrdPfc::Configuration::m_wqueue_threads, XrdOfsConfigPI::New(), open(), XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdSysError::Say(), XrdOss::StatVS(), XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), XrdSysTrace::What, and XrdOucGetCache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char *  curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Definition at line 901 of file XrdPfc.cc.

902 {
903  TRACE(Debug, "ConsiderFileCached '" << curl << "'" );
904 
905  XrdCl::URL url(curl);
906  std::string f_name = url.GetPath();
907  std::string i_name = f_name + Info::s_infoExtension;
908 
909  {
910  XrdSysCondVarHelper lock(&m_active_cond);
911  m_purge_delay_set.insert(f_name);
912  }
913 
914  struct stat sbuff, sbuff2;
915  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
916  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
917  {
918  if (S_ISDIR(sbuff.st_mode))
919  {
920  TRACE(Info, "ConsiderCached '" << curl << ", why=ForInfo" << " -> EISDIR");
921  return -EISDIR;
922  }
923  else
924  {
925  bool read_ok = false;
926  bool is_cached = false;
927 
928  // Lock and check if the file is active. If NOT, keep the lock
929  // and add dummy access after successful reading of info file.
930  // If it IS active, just release the lock, this ongoing access will
931  // assure the file continues to exist.
932 
933  // XXXX How can I just loop over the cinfo file when active?
934  // Can I not get is_complete from the existing file?
935  // Do I still want to inject access record?
936  // Oh, it writes only if not active .... still let's try to use existing File.
937 
938  m_active_cond.Lock();
939 
940  bool is_active = m_active.find(f_name) != m_active.end();
941 
942  if (is_active)
943  m_active_cond.UnLock();
944 
945  XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
946  XrdOucEnv myEnv;
947  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
948  if (res >= 0)
949  {
950  Info info(m_trace, 0);
951  if (info.Read(infoFile, i_name.c_str()))
952  {
953  read_ok = true;
954 
955  if (info.IsComplete())
956  {
957  is_cached = true;
958  }
959  else if (info.GetFileSize() == 0)
960  {
961  is_cached = true;
962  }
963  else
964  {
965  long long fileSize = info.GetFileSize();
966  long long bytesRead = info.GetNDownloadedBytes();
967 
968  if (fileSize < m_configuration.m_onlyIfCachedMinSize)
969  {
970  if ((float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
971  is_cached = true;
972  }
973  else
974  {
975  if (bytesRead > m_configuration.m_onlyIfCachedMinSize &&
976  (float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
977  is_cached = true;
978  }
979  }
980  }
981  infoFile->Close();
982  }
983  delete infoFile;
984 
985  if (!is_active) m_active_cond.UnLock();
986 
987  if (read_ok)
988  {
989  TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << (is_cached ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
990  return is_cached ? 0 : -EREMOTE;
991  }
992  }
993  }
994 
995  TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << " -> ENOENT");
996  return -ENOENT;
997 }
#define XrdOssOK
Definition: XrdOss.hh:50
int stat(const char *path, struct stat *buf)
URL representation.
Definition: XrdClURL.hh:31
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309

References XrdOssDF::Close(), Macaroons::Debug, XrdPfc::Info::GetFileSize(), XrdPfc::Info::GetNDownloadedBytes(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdSysCondVar::Lock(), XrdPfc::Configuration::m_onlyIfCachedMinFrac, XrdPfc::Configuration::m_onlyIfCachedMinSize, XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat(), XrdOss::Stat(), TRACE, XrdSysCondVar::UnLock(), and XrdOssOK.

Referenced by XrdPfcFSctl::FSctl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger logger,
XrdOucEnv env 
)
static

Singleton creation.

Definition at line 123 of file XrdPfc.cc.

124 {
125  assert (m_instance == 0);
126  m_instance = new Cache(logger, env);
127  return *m_instance;
128 }
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition: XrdPfc.cc:156

References Cache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 135 of file XrdPfc.cc.

136 {
137  if (! m_decisionpoints.empty())
138  {
139  XrdCl::URL url(io->Path());
140  std::string filename = url.GetPath();
141  std::vector<Decision*>::const_iterator it;
142  for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
143  {
144  XrdPfc::Decision *d = *it;
145  if (! d) continue;
146  if (! d->Decide(filename, *m_oss))
147  {
148  return false;
149  }
150  }
151  }
152 
153  return true;
154 }
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File file)

Definition at line 690 of file XrdPfc.cc.

691 {
692  // Can be called with other locks held.
693 
694  if ( ! m_prefetch_enabled)
695  {
696  return;
697  }
698 
699  m_prefetch_condVar.Lock();
700  for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
701  {
702  if (*it == file)
703  {
704  m_prefetchList.erase(it);
705  break;
706  }
707  }
708  m_prefetch_condVar.UnLock();
709 }

References XrdSysCondVar::Lock(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string &  command_url)

Definition at line 50 of file XrdPfcCommand.cc.

51 {
52  static const char *top_epfx = "ExecuteCommandUrl ";
53 
54  SplitParser cp(command_url, "/");
55 
56  std::string token = cp.get_token();
57 
58  if (token != "xrdpfc_command")
59  {
60  TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
61  return;
62  }
63 
64  // Get the command
65  token = cp.get_token();
66 
67 
68  //================================================================
69  // create_file
70  //================================================================
71 
72  if (token == "create_file")
73  {
74  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
75  static const char* usage =
76  "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
77  " Creates a cache file with given parameters. Data in file is random.\n"
78  " Useful for cache purge testing.\n"
79  "Notes:\n"
80  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
81  " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
82  " . -t and -d can be given multiple times to record several accesses.\n"
83  " . Negative arguments given to -t are interpreted as relative to now.\n";
84 
85  const Configuration &conf = m_configuration;
86 
87  token = cp.get_token();
88 
89  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
90 
91  std::vector<char*> argv;
92  SplitParser ap(token, " ");
93  int argc = ap.fill_argv(argv);
94 
95  long long file_size = ONE_GB;
96  long long block_size = conf.m_bufferSize;
97  int access_time [MAX_ACCESSES];
98  int access_duration[MAX_ACCESSES];
99  int at_count = 0, ad_count = 0;
100  XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
101  "help", 1, "h",
102  "verbose", 1, "v",
103  "size", 1, "s",
104  "blocksize", 1, "b",
105  "time", 1, "t",
106  "duration", 1, "d",
107  (const char *) 0);
108 
109  time_t time_now = time(0);
110 
111  Spec.Set(argc, &argv[0]);
112  char theOpt;
113 
114  while ((theOpt = Spec.getopt()) != (char) -1)
115  {
116  switch (theOpt)
117  {
118  case 'h': {
119  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
120  return;
121  }
122  case 's': {
123  if (XrdOuca2x::a2sz(m_log, "Error getting filesize", Spec.getarg(),
124  &file_size, 0ll, 32 * ONE_GB))
125  return;
126  break;
127  }
128  case 'b': {
129  if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", Spec.getarg(),
130  &block_size, 0ll, 64 * ONE_MB))
131  return;
132  break;
133  }
134  case 't': {
135  if (XrdOuca2x::a2i(m_log, "Error getting access time", Spec.getarg(),
136  &access_time[at_count++], INT_MIN, INT_MAX))
137  return;
138  break;
139  }
140  case 'd': {
141  if (XrdOuca2x::a2i(m_log, "Error getting access duration", Spec.getarg(),
142  &access_duration[ad_count++], 0, 24 * 3600))
143  return;
144  break;
145  }
146  default: {
147  TRACE(Error, err_prefix << "Unhandled command argument.");
148  return;
149  }
150  }
151  }
152  if (Spec.getarg())
153  {
154  TRACE(Error, err_prefix << "Options must take up all the arguments.");
155  return;
156  }
157 
158  if (at_count < 1) access_time [at_count++] = time_now - 10;
159  if (ad_count < 1) access_duration[ad_count++] = 10;
160 
161  if (at_count != ad_count)
162  {
163  TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
164  return;
165  }
166 
167  std::string file_path (cp.get_reminder_with_delim());
168  std::string cinfo_path(file_path + Info::s_infoExtension);
169 
170  TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
171 
172  // Check if cinfo exists ... bail out if it does.
173  {
174  struct stat infoStat;
175  if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
176  {
177  TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
178  return;
179  }
180  }
181 
182  TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
183 
184  {
185  const char *myUser = conf.m_username.c_str();
186  XrdOucEnv myEnv;
187 
188  // Create the data file.
189 
190  char size_str[32]; sprintf(size_str, "%lld", file_size);
191  myEnv.Put("oss.asize", size_str);
192  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
193  int cret;
194  if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
195  {
196  TRACE(Error, err_prefix << "Create failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
197  return;
198  }
199 
200  XrdOssDF *myFile = GetOss()->newFile(myUser);
201  if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
202  {
203  TRACE(Error, err_prefix << "Open failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
204  delete myFile;
205  return;
206  }
207 
208  // Create the info file.
209 
210  myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
211  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
212  if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
213  {
214  TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
215  myFile->Close(); delete myFile;
216  return;
217  }
218 
219  XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
220  if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
221  {
222  TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
223  delete myInfoFile;
224  myFile->Close(); delete myFile;
225  return;
226  }
227 
228  // Allocate space for the data file.
229 
230  if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
231  {
232  TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(cret));
233  }
234 
235  // Fill up cinfo.
236 
237  Info myInfo(m_trace, false);
238  myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
239  myInfo.SetAllBitsSynced();
240 
241  for (int i = 0; i < at_count; ++i)
242  {
243  time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
244 
245  myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
246  }
247 
248  myInfo.Write(myInfoFile, cinfo_path.c_str());
249 
250  // Fake last modified time to the last access_time
251  {
252  time_t last_detach;
253  myInfo.GetLatestDetachTime(last_detach);
254  struct timespec acc_mod_time[2] = { {last_detach, UTIME_OMIT}, {last_detach, 0} };
255 
256  futimens(myInfoFile->getFD(), acc_mod_time);
257  }
258 
259  myInfoFile->Close(); delete myInfoFile;
260  myFile->Close(); delete myFile;
261 
262  TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB.");
263 
264  {
265  XrdSysCondVarHelper lock(&m_writeQ.condVar);
266 
267  m_writeQ.writes_between_purges += file_size;
268  }
269  {
270  int token = m_res_mon->register_file_open(file_path, time_now, false);
271  XrdPfc::Stats stats;
272  stats.m_BytesWritten = file_size;
273  stats.m_StBlocksAdded = (file_size & 0x1ff) ? (file_size >> 9) + 1 : file_size >> 9;
274  m_res_mon->register_file_update_stats(token, stats);
275  m_res_mon->register_file_close(token, time(0), stats);
276  }
277  }
278  }
279 
280  //================================================================
281  // remove_file
282  //================================================================
283 
284  else if (token == "remove_file")
285  {
286  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
287  static const char* usage =
288  "Usage: remove_file/ [-h] /<path>\n"
289  " Removes given file from the cache unless it is currently open.\n"
290  " Useful for removal of stale files or duplicate files in a caching cluster.\n"
291  "Notes:\n"
292  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
293 
294  token = cp.get_token();
295 
296  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
297 
298  std::vector<char*> argv;
299  SplitParser ap(token, " ");
300  int argc = ap.fill_argv(argv);
301 
302  XrdOucArgs Spec(&m_log, err_prefix, "h",
303  "help", 1, "h",
304  (const char *) 0);
305 
306  Spec.Set(argc, &argv[0]);
307  char theOpt;
308 
309  while ((theOpt = Spec.getopt()) != (char) -1)
310  {
311  switch (theOpt)
312  {
313  case 'h': {
314  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
315  return;
316  }
317  default: {
318  TRACE(Error, err_prefix << "Unhandled command argument.");
319  return;
320  }
321  }
322  }
323  if (Spec.getarg())
324  {
325  TRACE(Error, err_prefix << "Options must take up all the arguments.");
326  return;
327  }
328 
329  std::string f_name(cp.get_reminder_with_delim());
330 
331  TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
332 
333  int ret = UnlinkFile(f_name, true);
334 
335  TRACE(Info, err_prefix << "returned with status " << ret);
336  }
337 
338  //================================================================
339  // unknown command
340  //================================================================
341 
342  else
343  {
344  TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
345  }
346 }
void usage()
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
const int MAX_ACCESSES
const long long ONE_GB
const long long ONE_MB
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:41
bool Create
virtual int getFD()
Definition: XrdOss.hh:426
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:45
XrdOss * GetOss() const
Definition: XrdPfc.hh:263
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1061
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1120
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
Definition: XrdPfcStats.hh:43
long long m_BytesWritten
number of bytes written to disk
Definition: XrdPfcStats.hh:42
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Macaroons::Debug, ERRNO_AND_ERRSTR, Macaroons::Error, XrdPfc::SplitParser::fill_argv(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdOucArgs::getarg(), XrdOssDF::getFD(), XrdPfc::Info::GetLatestDetachTime(), XrdOucArgs::getopt(), GetOss(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Stats::m_BytesWritten, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Stats::m_StBlocksAdded, XrdPfc::Configuration::m_username, MAX_ACCESSES, XrdOss::newFile(), ONE_GB, ONE_MB, XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::ResourceMonitor::register_file_close(), XrdPfc::ResourceMonitor::register_file_open(), XrdPfc::ResourceMonitor::register_file_update_stats(), XrdPfc::Info::s_infoExtension, XrdSysError::Say(), XrdOucArgs::Set(), XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), stat(), Stat(), TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

+ Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File f,
bool  high_debug 
)

Definition at line 542 of file XrdPfc.cc.

543 {
544  dec_ref_cnt(f, high_debug);
545 }

◆ GetFile()

File * Cache::GetFile ( const std::string &  path,
IO io,
long long  off = 0,
long long  filesize = 0 
)

Definition at line 387 of file XrdPfc.cc.

388 {
389  // Called from virtual IO::Attach
390 
391  TRACE(Debug, "GetFile " << path << ", io " << io);
392 
393  ActiveMap_i it;
394 
395  {
396  XrdSysCondVarHelper lock(&m_active_cond);
397 
398  while (true)
399  {
400  it = m_active.find(path);
401 
402  // File is not open or being opened. Mark it as being opened and
403  // proceed to opening it outside of while loop.
404  if (it == m_active.end())
405  {
406  it = m_active.insert(std::make_pair(path, (File*) 0)).first;
407  break;
408  }
409 
410  if (it->second != 0)
411  {
412  it->second->AddIO(io);
413  inc_ref_cnt(it->second, false, true);
414 
415  return it->second;
416  }
417  else
418  {
419  // Wait for some change in m_active, then recheck.
420  m_active_cond.Wait();
421  }
422  }
423  }
424 
425  if (filesize == 0)
426  {
427  struct stat st;
428  int res = io->Fstat(st);
429  if (res < 0) {
430  errno = res;
431  TRACE(Error, "GetFile, could not get valid stat");
432  } else if (res > 0) {
433  errno = ENOTSUP;
434  TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
435  } else {
436  filesize = st.st_size;
437  }
438  }
439 
440  File *file = 0;
441 
442  if (filesize >= 0)
443  {
444  file = File::FileOpen(path, off, filesize);
445  }
446 
447  {
448  XrdSysCondVarHelper lock(&m_active_cond);
449 
450  if (file)
451  {
452  inc_ref_cnt(file, false, true);
453  it->second = file;
454 
455  file->AddIO(io);
456  }
457  else
458  {
459  m_active.erase(it);
460  }
461 
462  m_active_cond.Broadcast();
463  }
464 
465  return file;
466 }
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:108
void AddIO(IO *io)
Definition: XrdPfcFile.cc:310

References XrdPfc::File::AddIO(), XrdSysCondVar::Broadcast(), Macaroons::Debug, Macaroons::Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), stat(), TRACE, and XrdSysCondVar::Wait().

Referenced by XrdPfc::IOFile::IOFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream* XrdPfc::Cache::GetGStream ( )
inline

Definition at line 281 of file XrdPfc.hh.

281 { return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 130 of file XrdPfc.cc.

130 { return *m_instance; }

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::File::GetLog(), XrdPfc::File::GetTrace(), XrdPfc::ResourceMonitor::perform_purge_check(), XrdPfc::ResourceMonitor::perform_purge_task_cleanup(), PrefetchThread(), ProcessWriteTaskThread(), Proto_ResourceMonitorHeartBeat(), XrdPfc::File::Sync(), and XrdPfc::File::WriteBlockToDisk().

+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError* XrdPfc::Cache::GetLog ( )
inline

Definition at line 277 of file XrdPfc.hh.

277 { return &m_log; }

Referenced by XrdPfc::File::GetLog().

+ Here is the caller graph for this function:

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 712 of file XrdPfc.cc.

713 {
714  m_prefetch_condVar.Lock();
715  while (m_prefetchList.empty())
716  {
717  m_prefetch_condVar.Wait();
718  }
719 
720  // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
721 
722  size_t l = m_prefetchList.size();
723  int idx = rand() % l;
724  File* f = m_prefetchList[idx];
725 
726  m_prefetch_condVar.UnLock();
727  return f;
728 }

References XrdSysCondVar::Lock(), XrdSysCondVar::UnLock(), and XrdSysCondVar::Wait().

Referenced by Prefetch().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetOss()

XrdOss* XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 263 of file XrdPfc.hh.

263 { return m_oss; }

Referenced by ExecuteCommandUrl().

+ Here is the caller graph for this function:

◆ GetPurgePin()

PurgePin* XrdPfc::Cache::GetPurgePin ( ) const
inline

Definition at line 267 of file XrdPfc.hh.

267 { return m_purge_pin; }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace* XrdPfc::Cache::GetTrace ( )
inline

Definition at line 278 of file XrdPfc.hh.

278 { return m_trace; }

Referenced by XrdPfc::File::GetTrace(), and XrdPfc::IO::GetTrace().

+ Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string &  path) const

Definition at line 656 of file XrdPfc.cc.

657 {
658  XrdSysCondVarHelper lock(&m_active_cond);
659 
660  return m_active.find(path) != m_active.end() ||
661  m_purge_delay_set.find(path) != m_purge_delay_set.end();
662 }

◆ LocalFilePath()

int Cache::LocalFilePath ( const char *  curl,
char *  buff = 0,
int  blen = 0,
LFP_Reason  why = ForAccess,
bool  forall = false 
)
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 773 of file XrdPfc.cc.

775 {
776  static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
777  static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
778  static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
779 
780  TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
781 
782  if (buff && blen > 0) buff[0] = 0;
783 
784  XrdCl::URL url(curl);
785  std::string f_name = url.GetPath();
786  std::string i_name = f_name + Info::s_infoExtension;
787 
788  if (why == ForPath)
789  {
790  int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
791  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
792  return ret;
793  }
794 
795  {
796  XrdSysCondVarHelper lock(&m_active_cond);
797  m_purge_delay_set.insert(f_name);
798  }
799 
800  struct stat sbuff, sbuff2;
801  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
802  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
803  {
804  if (S_ISDIR(sbuff.st_mode))
805  {
806  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
807  return -EISDIR;
808  }
809  else
810  {
811  bool read_ok = false;
812  bool is_complete = false;
813 
814  // Lock and check if the file is active. If NOT, keep the lock
815  // and add dummy access after successful reading of info file.
816  // If it IS active, just release the lock, this ongoing access will
817  // assure the file continues to exist.
818 
819  // XXXX How can I just loop over the cinfo file when active?
820  // Can I not get is_complete from the existing file?
821  // Do I still want to inject access record?
822  // Oh, it writes only if not active .... still let's try to use existing File.
823 
824  m_active_cond.Lock();
825 
826  bool is_active = m_active.find(f_name) != m_active.end();
827 
828  if (is_active) m_active_cond.UnLock();
829 
830  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
831  XrdOucEnv myEnv;
832  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
833  if (res >= 0)
834  {
835  Info info(m_trace, 0);
836  if (info.Read(infoFile, i_name.c_str()))
837  {
838  read_ok = true;
839 
840  is_complete = info.IsComplete();
841 
842  // Add full-size access if reason is for access.
843  if ( ! is_active && is_complete && why == ForAccess)
844  {
845  info.WriteIOStatSingle(info.GetFileSize());
846  info.Write(infoFile, i_name.c_str());
847  }
848  }
849  infoFile->Close();
850  }
851  delete infoFile;
852 
853  if ( ! is_active) m_active_cond.UnLock();
854 
855  if (read_ok)
856  {
857  if ((is_complete || why == ForInfo) && buff != 0)
858  {
859  int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
860  if (res2 < 0)
861  return res2;
862 
863  // Normally, files are owned by us but when direct cache access
864  // is wanted and possible, make sure the file is world readable.
865  if (why == ForAccess)
866  {mode_t mode = (forall ? worldReadable : groupReadable);
867  if (((sbuff.st_mode & worldReadable) != mode)
868  && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
869  {is_complete = false;
870  *buff = 0;
871  }
872  }
873  }
874 
875  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
876  (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
877 
878  return is_complete ? 0 : -EREMOTE;
879  }
880  }
881  }
882 
883  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
884  return -ENOENT;
885 }
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:873

References XrdOss::Chmod(), XrdOssDF::Close(), Macaroons::Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOss::Lfn2Pfn(), XrdSysCondVar::Lock(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat(), XrdOss::Stat(), TRACE, XrdSysCondVar::UnLock(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

+ Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 731 of file XrdPfc.cc.

732 {
733  const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
734 
735  while (true)
736  {
737  m_RAM_mutex.Lock();
738  bool doPrefetch = (m_RAM_used < limit_RAM);
739  m_RAM_mutex.UnLock();
740 
741  if (doPrefetch)
742  {
744  f->Prefetch();
745  }
746  else
747  {
749  }
750  }
751 }
File * GetNextFileToPrefetch()
Definition: XrdPfc.cc:712
void Prefetch()
Definition: XrdPfcFile.cc:1485
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227

References GetNextFileToPrefetch(), XrdSysMutex::Lock(), XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::File::Prefetch(), XrdSysMutex::UnLock(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Prepare()

int Cache::Prepare ( const char *  curl,
int  oflags,
mode_t  mode 
)
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1009 of file XrdPfc.cc.

1010 {
1011  XrdCl::URL url(curl);
1012  std::string f_name = url.GetPath();
1013  std::string i_name = f_name + Info::s_infoExtension;
1014 
1015  // Do not allow write access.
1016  if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1017  {
1018  TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1019  return -EROFS;
1020  }
1021 
1022  // Intercept xrdpfc_command requests.
1023  if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1024  {
1025  // Schedule a job to process command request.
1026  {
1027  CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1028 
1029  schedP->Schedule(ce);
1030  }
1031 
1032  return -EAGAIN;
1033  }
1034 
1035  {
1036  XrdSysCondVarHelper lock(&m_active_cond);
1037  m_purge_delay_set.insert(f_name);
1038  }
1039 
1040  struct stat sbuff;
1041  int res = m_oss->Stat(i_name.c_str(), &sbuff);
1042  if (res == 0)
1043  {
1044  TRACE(Dump, "Prepare defer open " << f_name);
1045  return 1;
1046  }
1047  else
1048  {
1049  return 0;
1050  }
1051 }
@ Warning
static XrdScheduler * schedP
Definition: XrdPfc.hh:285
void Schedule(XrdJob *jp)

References XrdCl::URL::GetPath(), XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Info::s_infoExtension, schedP, XrdScheduler::Schedule(), stat(), XrdOss::Stat(), TRACE, and Warning.

+ Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 271 of file XrdPfc.cc.

272 {
273  std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
274 
275  while (true)
276  {
277  m_writeQ.condVar.Lock();
278  while (m_writeQ.size == 0)
279  {
280  m_writeQ.condVar.Wait();
281  }
282 
283  // MT -- optimize to pop several blocks if they are available (or swap the list).
284  // This makes sense especially for smallish block sizes.
285 
286  int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
287  long long sum_size = 0;
288 
289  for (int bi = 0; bi < n_pushed; ++bi)
290  {
291  Block* block = m_writeQ.queue.front();
292  m_writeQ.queue.pop_front();
293  m_writeQ.writes_between_purges += block->get_size();
294  sum_size += block->get_size();
295 
296  blks_to_write[bi] = block;
297 
298  TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
299  }
300  m_writeQ.size -= n_pushed;
301 
302  m_writeQ.condVar.UnLock();
303 
304  {
305  XrdSysMutexHelper lock(&m_RAM_mutex);
306  m_RAM_write_queue -= sum_size;
307  }
308 
309  for (int bi = 0; bi < n_pushed; ++bi)
310  {
311  Block* block = blks_to_write[bi];
312 
313  block->m_file->WriteBlockToDisk(block);
314  }
315  }
316 }
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1470
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:1008

References XrdPfc::Block::get_size(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, XrdPfc::Configuration::m_wqueue_blocks, TRACE, and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration& XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 199 of file XrdPfc.hh.

199 { return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), Conf(), XrdPfc::File::WriteBlockToDisk(), and XrdOucGetCache().

+ Here is the caller graph for this function:

◆ RefResMon()

ResourceMonitor& XrdPfc::Cache::RefResMon ( )
inline

Definition at line 280 of file XrdPfc.hh.

280 { return *m_res_mon; }

Referenced by ResMon().

+ Here is the caller graph for this function:

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File file)

Definition at line 674 of file XrdPfc.cc.

675 {
676  // Can be called with other locks held.
677 
678  if ( ! m_prefetch_enabled)
679  {
680  return;
681  }
682 
683  m_prefetch_condVar.Lock();
684  m_prefetchList.push_back(file);
685  m_prefetch_condVar.Signal();
686  m_prefetch_condVar.UnLock();
687 }

References XrdSysCondVar::Lock(), XrdSysCondVar::Signal(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ ReleaseFile()

void Cache::ReleaseFile ( File f,
IO io 
)

Definition at line 468 of file XrdPfc.cc.

469 {
470  // Called from virtual IO::DetachFinalize.
471 
472  TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
473 
474  {
475  XrdSysCondVarHelper lock(&m_active_cond);
476 
477  f->RemoveIO(io);
478  }
479  dec_ref_cnt(f, true);
480 }
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:347

References Macaroons::Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize(), and XrdPfc::IOFileBlock::DetachFinalize().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char *  buf,
long long  size 
)

Definition at line 369 of file XrdPfc.cc.

370 {
371  bool std_size = (size == m_configuration.m_bufferSize);
372  {
373  XrdSysMutexHelper lock(&m_RAM_mutex);
374 
375  m_RAM_used -= size;
376 
377  if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
378  {
379  m_RAM_std_blocks.push_back(buf);
380  ++m_RAM_std_size;
381  return;
382  }
383  }
384  free(buf);
385 }

References XrdPfc::Configuration::m_bufferSize, and XrdPfc::Configuration::m_RamKeepStdBlocks.

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 238 of file XrdPfc.cc.

239 {
240  std::list<Block*> removed_blocks;
241  long long sum_size = 0;
242 
243  m_writeQ.condVar.Lock();
244  std::list<Block*>::iterator i = m_writeQ.queue.begin();
245  while (i != m_writeQ.queue.end())
246  {
247  if ((*i)->m_file == file)
248  {
249  TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
250  std::list<Block*>::iterator j = i++;
251  removed_blocks.push_back(*j);
252  sum_size += (*j)->get_size();
253  m_writeQ.queue.erase(j);
254  --m_writeQ.size;
255  }
256  else
257  {
258  ++i;
259  }
260  }
261  m_writeQ.condVar.UnLock();
262 
263  {
264  XrdSysMutexHelper lock(&m_RAM_mutex);
265  m_RAM_write_queue -= sum_size;
266  }
267 
268  file->BlocksRemovedFromWriteQ(removed_blocks);
269 }

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdPfc::File::lPath(), and TRACE.

Referenced by UnlinkFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long  size)

Definition at line 329 of file XrdPfc.cc.

330 {
331  static const size_t s_block_align = sysconf(_SC_PAGESIZE);
332 
333  bool std_size = (size == m_configuration.m_bufferSize);
334 
335  m_RAM_mutex.Lock();
336 
337  long long total = m_RAM_used + size;
338 
339  if (total <= m_configuration.m_RamAbsAvailable)
340  {
341  m_RAM_used = total;
342  if (std_size && m_RAM_std_size > 0)
343  {
344  char *buf = m_RAM_std_blocks.back();
345  m_RAM_std_blocks.pop_back();
346  --m_RAM_std_size;
347 
348  m_RAM_mutex.UnLock();
349 
350  return buf;
351  }
352  else
353  {
354  m_RAM_mutex.UnLock();
355  char *buf;
356  if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
357  {
358  // Report out of mem? Probably should report it at least the first time,
359  // then periodically.
360  return 0;
361  }
362  return buf;
363  }
364  }
365  m_RAM_mutex.UnLock();
366  return 0;
367 }

References XrdSysMutex::Lock(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_RamAbsAvailable, and XrdSysMutex::UnLock().

+ Here is the call graph for this function:

◆ ResMon()

ResourceMonitor & Cache::ResMon ( )
static

Definition at line 133 of file XrdPfc.cc.

133 { return m_instance->RefResMon(); }
ResourceMonitor & RefResMon()
Definition: XrdPfc.hh:280

References RefResMon().

Referenced by XrdPfc::File::~File(), XrdPfc::ResourceMonitor::perform_purge_check(), ResourceMonitorThread(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File f)
inline

Definition at line 273 of file XrdPfc.hh.

273 { schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char *  curl,
struct stat sbuff 
)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1061 of file XrdPfc.cc.

1062 {
1063  XrdCl::URL url(curl);
1064  std::string f_name = url.GetPath();
1065 
1066  {
1067  XrdSysCondVarHelper lock(&m_active_cond);
1068  m_purge_delay_set.insert(f_name);
1069  }
1070 
1071  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK)
1072  {
1073  if (S_ISDIR(sbuff.st_mode))
1074  {
1075  return 0;
1076  }
1077  else
1078  {
1079  bool success = false;
1080  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
1081  XrdOucEnv myEnv;
1082 
1083  f_name += Info::s_infoExtension;
1084  int res = infoFile->Open(f_name.c_str(), O_RDONLY, 0600, myEnv);
1085  if (res >= 0)
1086  {
1087  Info info(m_trace, 0);
1088  if (info.Read(infoFile, f_name.c_str()))
1089  {
1090  sbuff.st_size = info.GetFileSize();
1091  success = true;
1092  }
1093  }
1094  infoFile->Close();
1095  delete infoFile;
1096  return success ? 0 : 1;
1097  }
1098  }
1099 
1100  return 1;
1101 }

References XrdOssDF::Close(), XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, XrdOss::Stat(), and XrdOssOK.

Referenced by ExecuteCommandUrl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 131 of file XrdPfc.cc.

131 { return *m_instance; }

Referenced by XrdPfc::OldStylePurgeDriver(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the caller graph for this function:

◆ Unlink()

int Cache::Unlink ( const char *  curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1110 of file XrdPfc.cc.

1111 {
1112  XrdCl::URL url(curl);
1113  std::string f_name = url.GetPath();
1114 
1115  // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1116 
1117  return UnlinkFile(f_name, false);
1118 }

References XrdCl::URL::GetPath(), and UnlinkFile().

+ Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string &  f_name,
bool  fail_if_open 
)

Remove cinfo and data files from cache.

Definition at line 1120 of file XrdPfc.cc.

1121 {
1122  static const char* trc_pfx = "UnlinkFile ";
1123  ActiveMap_i it;
1124  File *file = 0;
1125  {
1126  XrdSysCondVarHelper lock(&m_active_cond);
1127 
1128  it = m_active.find(f_name);
1129 
1130  if (it != m_active.end())
1131  {
1132  if (fail_if_open)
1133  {
1134  TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1135  return -EBUSY;
1136  }
1137 
1138  // Null File* in m_active map means an operation is ongoing, probably
1139  // Attach() with possible File::Open(). Ask for retry.
1140  if (it->second == 0)
1141  {
1142  TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1143  return -EAGAIN;
1144  }
1145 
1146  file = it->second;
1148  it->second = 0;
1149  }
1150  else
1151  {
1152  it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1153  }
1154  }
1155 
1156  if (file)
1157  {
1158  RemoveWriteQEntriesFor(file);
1159  }
1160 
1161  std::string i_name = f_name + Info::s_infoExtension;
1162 
1163  // Unlink file & cinfo
1164  struct stat f_stat;
1165  bool stat_ok = (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK);
1166  int f_ret = m_oss->Unlink(f_name.c_str());
1167  int i_ret = m_oss->Unlink(i_name.c_str());
1168 
1169  if (stat_ok)
1170  m_res_mon->register_file_purge(f_name, f_stat.st_blocks);
1171 
1172  TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1173 
1174  {
1175  XrdSysCondVarHelper lock(&m_active_cond);
1176 
1177  m_active.erase(it);
1178  }
1179 
1180  return std::min(f_ret, i_ret);
1181 }
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition: XrdPfc.cc:238
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:121
void register_file_purge(DirState *target, long long size_in_st_blocks)

References Macaroons::Debug, XrdPfc::File::initiate_emergency_shutdown(), XrdPfc::ResourceMonitor::register_file_purge(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, stat(), XrdOss::Stat(), TRACE, XrdOss::Unlink(), and XrdOssOK.

Referenced by ExecuteCommandUrl(), XrdPfcFSctl::FSctl(), XrdPfc::File::Sync(), and Unlink().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ VCheck()

static bool XrdPfc::Cache::VCheck ( XrdVersionInfo &  urVersion)
inlinestatic

Version check.

Definition at line 228 of file XrdPfc.hh.

228 { return true; }

◆ WritesSinceLastCall()

long long Cache::WritesSinceLastCall ( )

Definition at line 318 of file XrdPfc.cc.

319 {
320  // Called from ResourceMonitor for an alternative estimation of disk writes.
321  XrdSysCondVarHelper lock(&m_writeQ.condVar);
322  long long ret = m_writeQ.writes_between_purges;
323  m_writeQ.writes_between_purges = 0;
324  return ret;
325 }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = nullptr
static

The documentation for this class was generated from the following files: