XRootD
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

+ Inheritance diagram for XrdPfc::Cache:
+ Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor. More...
 
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue. More...
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *ioP, int opts=0)=0
 Obtain a new IO object that fronts existing XrdOucCacheIO. More...
 
bool Config (const char *config_filename, const char *parameters)
 Parse configuration file. More...
 
virtual int ConsiderCached (const char *url)
 
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached. More...
 
bool DecideIfConsideredCached (long long file_size, long long bytes_on_disk)
 
void DeRegisterPrefetchFile (File *)
 
long long DetermineFullFileSize (const std::string &cinfo_fname)
 
void ExecuteCommandUrl (const std::string &command_url)
 
void FileSyncDone (File *, bool high_debug)
 
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
 
XrdXrootdGStreamGetGStream ()
 
XrdSysErrorGetLog ()
 
FileGetNextFileToPrefetch ()
 
XrdOssGetOss () const
 
XrdSysTraceGetTrace ()
 
bool IsFileActiveOrPurgeProtected (const std::string &)
 
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
 
void Prefetch ()
 
virtual int Prepare (const char *url, int oflags, mode_t mode)
 
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk. More...
 
void Purge ()
 Thread function invoked to scan and purge files from disk when needed. More...
 
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration. More...
 
void RegisterPrefetchFile (File *)
 
void ReleaseFile (File *, IO *)
 
void ReleaseRAM (char *buf, long long size)
 
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction. More...
 
char * RequestRAM (long long size)
 
void ResourceMonitorHeartBeat ()
 Thread function checking resource usage periodically. More...
 
void ScheduleFileSync (File *f)
 
virtual int Stat (const char *url, struct stat &sbuff)
 
virtual int Unlink (const char *url)
 
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache. More...
 
void WriteFileSizeXAttr (int cinfo_fd, long long file_size)
 
- Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
 
virtual ~XrdOucCache ()
 Destructor. More...
 
virtual int Rename (const char *oldp, const char *newp)
 
virtual int Rmdir (const char *dirp)
 
virtual int Truncate (const char *path, off_t size)
 
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)
 

Static Public Member Functions

static const ConfigurationConf ()
 
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation. More...
 
static CacheGetInstance ()
 Singleton access. More...
 
static const CacheTheOne ()
 
static bool VCheck (XrdVersionInfo &urVersion)
 Version check. More...
 

Static Public Attributes

static XrdSchedulerschedP = 0
 
- Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file) More...
 
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write) More...
 
static const int optRW = 0x0004
 File is read/write (o/w read/only) More...
 
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache. More...
 

Additional Inherited Members

- Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
 
enum  XeqCmd { xeqNoop = 0 }
 
- Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc). More...
 
XrdOucCacheStats Statistics
 

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 266 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger logger,
XrdOucEnv env 
)

Constructor.

Definition at line 188 of file XrdPfc.cc.

188  :
189  XrdOucCache("pfc"),
190  m_env(env),
191  m_log(logger, "XrdPfc_"),
192  m_trace(new XrdSysTrace("XrdPfc", logger)),
193  m_traceID("Cache"),
194  m_oss(0),
195  m_gstream(0),
196  m_prefetch_condVar(0),
197  m_prefetch_enabled(false),
198  m_RAM_used(0),
199  m_RAM_write_queue(0),
200  m_RAM_std_size(0),
201  m_isClient(false),
202  m_in_purge(false),
203  m_active_cond(0),
204  m_stats_n_purge_cond(0),
205  m_fs_state(0),
206  m_last_scan_duration(0),
207  m_last_purge_duration(0),
208  m_spt_state(SPTS_Idle)
209 {
210  // Default log level is Warning.
211  m_trace->What = 2;
212 }
XrdOucCache(const char *ctype)
Definition: XrdOucCache.hh:700

References XrdSysTrace::What.

Referenced by CreateInstance().

+ Here is the caller graph for this function:

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block b,
bool  from_read 
)

Add downloaded block in write queue.

Definition at line 256 of file XrdPfc.cc.

257 {
258  TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
259 
260  {
261  XrdSysMutexHelper lock(&m_RAM_mutex);
262  m_RAM_write_queue += b->get_size();
263  }
264 
265  m_writeQ.condVar.Lock();
266  if (fromRead)
267  m_writeQ.queue.push_back(b);
268  else
269  m_writeQ.queue.push_front(b);
270  m_writeQ.size++;
271  m_writeQ.condVar.Signal();
272  m_writeQ.condVar.UnLock();
273 }
#define TRACE(act, x)
Definition: XrdTrace.hh:63
int get_size() const
Definition: XrdPfcFile.hh:147
File * get_file() const
Definition: XrdPfcFile.hh:151
long long m_offset
Definition: XrdPfcFile.hh:125
std::string & GetLocalPath()
Definition: XrdPfcFile.hh:275

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdPfc::Block::m_offset, and TRACE.

+ Here is the call graph for this function:

◆ Attach() [1/2]

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO io,
int  Options = 0 
)
virtual

Implements XrdOucCache.

Definition at line 214 of file XrdPfc.cc.

215 {
216  const char* tpfx = "Attach() ";
217 
218  if (Cache::GetInstance().Decide(io))
219  {
220  TRACE(Info, tpfx << obfuscateAuth(io->Path()));
221 
222  IO *cio;
223 
224  if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
225  {
226  cio = new IOFileBlock(io, *this);
227  }
228  else
229  {
230  IOFile *iof = new IOFile(io, *this);
231 
232  if ( ! iof->HasFile())
233  {
234  delete iof;
235  // TODO - redirect instead. But this is kind of an awkward place for it.
236  // errno is set during IOFile construction.
237  TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
238  return io;
239  }
240 
241  cio = iof;
242  }
243 
244  TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
245  ((loc && loc[0] != 0) ? loc : "<deferred open>"));
246 
247  return cio;
248  }
249  else
250  {
251  TRACE(Info, tpfx << "decision decline " << io->Path());
252  }
253  return io;
254 }
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
Definition: XrdPfcTrace.hh:59
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
Definition: XrdOucCache.hh:161
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:319
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:163
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition: XrdPfc.cc:167
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:40
bool HasFile() const
Check if File was opened successfully.
Definition: XrdPfcIOFile.hh:49
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:18
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:45

References Macaroons::Debug, Decide(), Macaroons::Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

+ Here is the call graph for this function:

◆ Attach() [2/2]

virtual XrdOucCacheIO* XrdOucCache::Attach

Obtain a new IO object that fronts existing XrdOucCacheIO.

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 165 of file XrdPfc.cc.

165 { return m_instance->RefConfiguration(); }

References RefConfiguration().

Referenced by XrdPfc::FPurgeState::CheckFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char *  config_filename,
const char *  parameters 
)

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
Returns
parse status

Definition at line 351 of file XrdPfcConfiguration.cc.

352 {
353  // Indicate whether or not we are a client instance
354  const char *theINS = getenv("XRDINSTANCE");
355  m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
356 
357  // Tell everyone else we are a caching proxy
358  XrdOucEnv::Export("XRDPFC", 1);
359 
360  XrdOucEnv myEnv;
361  XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
362 
363  if (! config_filename || ! *config_filename)
364  {
365  TRACE(Error, "Config() configuration file not specified.");
366  return false;
367  }
368 
369  int fd;
370  if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
371  {
372  TRACE( Error, "Config() can't open configuration file " << config_filename);
373  return false;
374  }
375 
376  Config.Attach(fd);
377  static const char *cvec[] = { "*** pfc plugin config:", 0 };
378  Config.Capture(cvec);
379 
380  // Obtain OFS configurator for OSS plugin.
381  XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
382  &XrdVERSIONINFOVAR(XrdOucGetCache));
383  if (! ofsCfg) return false;
384 
385  TmpConfiguration tmpc;
386 
387  // Adjust default parameters for client/serverless caching
388  if (m_isClient)
389  {
390  m_configuration.m_bufferSize = 128 * 1024; // same as normal.
391  m_configuration.m_wqueue_blocks = 8;
392  m_configuration.m_wqueue_threads = 1;
393  }
394 
395  // If network checksum processing is the default, indicate so.
396  if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
397 
398  // Actual parsing of the config file.
399  bool retval = true, aOK = true;
400  char *var;
401  while ((var = Config.GetMyFirstWord()))
402  {
403  if (! strcmp(var,"pfc.osslib"))
404  {
405  retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
406  }
407  else if (! strcmp(var,"pfc.cschk"))
408  {
409  retval = xcschk(Config);
410  }
411  else if (! strcmp(var,"pfc.decisionlib"))
412  {
413  retval = xdlib(Config);
414  }
415  else if (! strcmp(var,"pfc.trace"))
416  {
417  retval = xtrace(Config);
418  }
419  else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
420  {
421  m_configuration.m_allow_xrdpfc_command = true;
422  }
423  else if (! strncmp(var,"pfc.", 4))
424  {
425  retval = ConfigParameters(std::string(var+4), Config, tmpc);
426  }
427 
428  if ( ! retval)
429  {
430  TRACE(Error, "Config() error in parsing");
431  aOK = false;
432  }
433  }
434 
435  Config.Close();
436 
437  // Load OSS plugin.
438  myEnv.Put("oss.runmode", "pfc");
439  if (m_configuration.is_cschk_cache())
440  {
441  char csi_conf[128];
442  if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
443  {
444  ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
445  } else {
446  TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
447  return false;
448  }
449  }
450  if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, &myEnv))
451  {
452  ofsCfg->Plugin(m_oss);
453  }
454  else
455  {
456  TRACE(Error, "Config() Unable to create an OSS object");
457  return false;
458  }
459 
460  // Test if OSS is operational, determine optional features.
461  aOK &= test_oss_basics_and_features();
462 
463  // sets default value for disk usage
464  XrdOssVSInfo sP;
465  {
466  if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
467  {
468  m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
469  return false;
470  }
471  if (sP.Total < 10ll << 20)
472  {
473  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
474  m_configuration.m_data_space.c_str());
475  return false;
476  }
477 
478  m_configuration.m_diskTotalSpace = sP.Total;
479 
480  if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
481  cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
482  {
483  if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
484  m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
485  aOK = false;
486  }
487  }
488  else aOK = false;
489 
490  if ( ! tmpc.m_fileUsageMax.empty())
491  {
492  if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
493  cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
494  cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
495  {
496  if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
497  m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
498  m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
499  {
500  m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
501  aOK = false;
502  }
503  }
504  else aOK = false;
505  }
506  }
507 
508  // sets flush frequency
509  if ( ! tmpc.m_flushRaw.empty())
510  {
511  if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
512  {
513  if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
514  &m_configuration.m_flushCnt,
515  100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
516  {
517  return false;
518  }
519  m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
520  }
521  else
522  {
523  if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
524  &m_configuration.m_flushCnt, 100, 100000))
525  {
526  return false;
527  }
528  }
529  }
530 
531  // get number of available RAM blocks after process configuration
532  if (m_configuration.m_RamAbsAvailable == 0)
533  {
534  m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
535  char buff[1024];
536  snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
537  m_log.Say("Config info: ", buff);
538  }
539  // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
540  m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
541 
542  // Set tracing to debug if this is set in environment
543  char* cenv = getenv("XRDDEBUG");
544  if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
545 
546  if (aOK)
547  {
548  int loff = 0;
549 // 000 001 010
550  const char *csc[] = {"off", "cache nonet", "nocache net notls",
551 // 011
552  "cache net notls",
553 // 100 101 110
554  "off", "cache nonet", "nocache net tls",
555 // 111
556  "cache net tls"};
557  char buff[8192], uvk[32];
558  if (m_configuration.m_cs_UVKeep < 0)
559  strcpy(uvk, "lru");
560  else
561  sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
562  float rg = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
563  loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
564  " pfc.cschk %s uvkeep %s\n"
565  " pfc.blocksize %lld\n"
566  " pfc.prefetch %d\n"
567  " pfc.ram %.fg\n"
568  " pfc.writequeue %d %d\n"
569  " # Total available disk: %lld\n"
570  " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
571  " pfc.spaces %s %s\n"
572  " pfc.trace %d\n"
573  " pfc.flush %lld\n"
574  " pfc.acchistorysize %d\n"
575  " pfc.onlyIfCachedMinBytes %lld\n"
576  " pfc.onlyIfCachedMinFrac %.2f\n",
577  config_filename,
578  csc[int(m_configuration.m_cs_Chk)], uvk,
579  m_configuration.m_bufferSize,
580  m_configuration.m_prefetch_max_blocks,
581  rg,
582  m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
583  sP.Total,
584  m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
585  m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
586  m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
587  m_configuration.m_data_space.c_str(),
588  m_configuration.m_meta_space.c_str(),
589  m_trace->What,
590  m_configuration.m_flushCnt,
591  m_configuration.m_accHistorySize,
592  m_configuration.m_onlyIfCachedMinSize,
593  m_configuration.m_onlyIfCachedMinFrac);
594 
595  if (m_configuration.is_dir_stat_reporting_on())
596  {
597  loff += snprintf(buff + loff, sizeof(buff) - loff,
598  " pfc.dirstats maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
599  m_configuration.m_dirStatsMaxDepth, m_configuration.m_dirStatsStoreDepth,
600  (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
601  loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
602  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
603  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
604  loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
605  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
606  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
607  }
608 
609  if (m_configuration.m_hdfsmode)
610  {
611  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
612  }
613 
614  if (m_configuration.m_username.empty())
615  {
616  char unameBuff[256];
617  XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
618  m_configuration.m_username = unameBuff;
619  }
620  else
621  {
622  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
623  }
624 
625  m_log.Say(buff);
626 
627  m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
628  }
629 
630  // Derived settings
631  m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
632  Info::s_maxNumAccess = m_configuration.m_accHistorySize;
633 
634  m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
635 
636  m_log.Say("Config Proxy File Cache g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive");
637 
638  m_log.Say("------ Proxy File Cache configuration parsing ", aOK ? "completed" : "failed");
639 
640  if (ofsCfg) delete ofsCfg;
641 
642  // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
643  // Building of xrdpfc_print fails when this is enabled.
644 #ifdef XRDPFC_CKSUM_TEST
645  {
646  int xxx = m_configuration.m_cs_Chk;
647 
648  for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
649  {
650  Info::TestCksumStuff();
651  }
652 
653  m_configuration.m_cs_Chk = xxx;
654  }
655 #endif
656 
657  return aOK;
658 }
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition: XrdPfc.cc:84
int open(const char *path, int oflag,...)
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition: XrdOssVS.hh:90
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
static int Export(const char *Var, const char *Val)
Definition: XrdOucEnv.cc:170
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:263
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
static size_t s_maxNumAccess
Definition: XrdPfcInfo.hh:315
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
@ CSChk_Both
Definition: XrdPfcTypes.hh:23
@ CSChk_None
Definition: XrdPfcTypes.hh:23
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition: XrdPfc.hh:108
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:102
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:109
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition: XrdPfc.hh:94
int m_wqueue_threads
number of threads writing blocks to disk
Definition: XrdPfc.hh:105
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition: XrdPfc.hh:85
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:90
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:88
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition: XrdPfc.hh:99
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:79
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:87
bool is_cschk_cache() const
Definition: XrdPfc.hh:69
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition: XrdPfc.hh:97
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:106
bool m_cs_ChkTLS
Allow TLS.
Definition: XrdPfc.hh:113
long long m_fileUsageNominal
cache purge - files usage nominal
Definition: XrdPfc.hh:89
int m_cs_Chk
Checksum check.
Definition: XrdPfc.hh:112
bool m_hdfsmode
flag for enabling block-level operation
Definition: XrdPfc.hh:78
int m_purgeColdFilesAge
purge files older than this age
Definition: XrdPfc.hh:92
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:82
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition: XrdPfc.hh:96
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:86
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:103
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:83
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:104
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:81
bool is_cschk_net() const
Definition: XrdPfc.hh:70
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:116
time_t m_cs_UVKeep
unverified checksum cache keep
Definition: XrdPfc.hh:111
int m_dirStatsMaxDepth
maximum depth for statistics write out
Definition: XrdPfc.hh:98
int m_purgeInterval
sleep interval between cache purges
Definition: XrdPfc.hh:91
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:115
bool is_dir_stat_reporting_on() const
Definition: XrdPfc.hh:62
std::string m_diskUsageLWM
Definition: XrdPfc.hh:123
std::string m_diskUsageHWM
Definition: XrdPfc.hh:124
std::string m_fileUsageBaseline
Definition: XrdPfc.hh:125
std::string m_fileUsageNominal
Definition: XrdPfc.hh:126
std::string m_flushRaw
Definition: XrdPfc.hh:128
std::string m_fileUsageMax
Definition: XrdPfc.hh:127

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, XrdSysError::Emsg(), Macaroons::Error, XrdOucEnv::Export(), XrdOucEnv::GetPtr(), XrdPfc::Configuration::is_cschk_cache(), XrdPfc::Configuration::is_cschk_net(), XrdPfc::Configuration::is_dir_stat_reporting_on(), XrdOfsConfigPI::Load(), XrdPfc::Configuration::m_accHistorySize, XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_cs_Chk, XrdPfc::Configuration::m_cs_ChkTLS, XrdPfc::Configuration::m_cs_UVKeep, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_dirStatsDirGlobs, XrdPfc::Configuration::m_dirStatsDirs, XrdPfc::Configuration::m_dirStatsMaxDepth, XrdPfc::Configuration::m_dirStatsStoreDepth, XrdPfc::Configuration::m_diskTotalSpace, XrdPfc::Configuration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::Configuration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::Configuration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::Configuration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::Configuration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::Configuration::m_flushCnt, XrdPfc::TmpConfiguration::m_flushRaw, XrdPfc::Configuration::m_hdfsbsize, XrdPfc::Configuration::m_hdfsmode, XrdPfc::Configuration::m_meta_space, XrdPfc::Configuration::m_onlyIfCachedMinFrac, XrdPfc::Configuration::m_onlyIfCachedMinSize, XrdPfc::Configuration::m_prefetch_max_blocks, XrdPfc::Configuration::m_purgeColdFilesAge, XrdPfc::Configuration::m_purgeInterval, XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::Configuration::m_RamKeepStdBlocks, XrdPfc::Configuration::m_username, XrdPfc::Configuration::m_wqueue_blocks, XrdPfc::Configuration::m_wqueue_threads, XrdOfsConfigPI::New(), open(), XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdSysError::Say(), XrdOss::StatVS(), XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), XrdSysTrace::What, and XrdOucGetCache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char *  curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why.
>0 - Reserved for future use.

Definition at line 994 of file XrdPfc.cc.

995 {
996  static const char* tpfx = "ConsiderCached ";
997 
998  TRACE(Debug, tpfx << curl);
999 
1000  XrdCl::URL url(curl);
1001  std::string f_name = url.GetPath();
1002 
1003  File *file = nullptr;
1004  {
1005  XrdSysCondVarHelper lock(&m_active_cond);
1006  auto it = m_active.find(f_name);
1007  if (it != m_active.end()) {
1008  file = it->second;
1009  inc_ref_cnt(file, false, false);
1010  }
1011  }
1012  if (file) {
1013  struct stat sbuff;
1014  int res = file->Fstat(sbuff);
1015  dec_ref_cnt(file, false);
1016  if (res)
1017  return res;
1018  // DecideIfConsideredCached() already called in File::Fstat().
1019  return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1020  }
1021 
1022  struct stat sbuff;
1023  int res = m_oss->Stat(f_name.c_str(), &sbuff);
1024  if (res != XrdOssOK) {
1025  TRACE(Debug, tpfx << curl << " -> " << res);
1026  return res;
1027  }
1028  if (S_ISDIR(sbuff.st_mode))
1029  {
1030  TRACE(Debug, tpfx << curl << " -> EISDIR");
1031  return -EISDIR;
1032  }
1033 
1034  long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1035  if (file_size < 0) {
1036  TRACE(Debug, tpfx << curl << " -> " << file_size);
1037  return (int) file_size;
1038  }
1039  bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1040 
1041  return is_cached ? 0 : -EREMOTE;
1042 }
#define XrdOssOK
Definition: XrdOss.hh:50
int stat(const char *path, struct stat *buf)
URL representation.
Definition: XrdClURL.hh:31
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition: XrdPfc.cc:928
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition: XrdPfc.cc:961
int Fstat(struct stat &sbuff)
Definition: XrdPfcFile.cc:491
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:313

References Macaroons::Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat(), XrdOss::Stat(), TRACE, and XrdOssOK.

Referenced by XrdPfcFSctl::FSctl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger logger,
XrdOucEnv env 
)
static

Singleton creation.

Definition at line 156 of file XrdPfc.cc.

157 {
158  assert (m_instance == 0);
159  m_instance = new Cache(logger, env);
160  return *m_instance;
161 }
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition: XrdPfc.cc:188

References Cache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 167 of file XrdPfc.cc.

168 {
169  if (! m_decisionpoints.empty())
170  {
171  XrdCl::URL url(io->Path());
172  std::string filename = url.GetPath();
173  std::vector<Decision*>::const_iterator it;
174  for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
175  {
176  XrdPfc::Decision *d = *it;
177  if (! d) continue;
178  if (! d->Decide(filename, *m_oss))
179  {
180  return false;
181  }
182  }
183  }
184 
185  return true;
186 }
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DecideIfConsideredCached()

bool Cache::DecideIfConsideredCached ( long long  file_size,
long long  bytes_on_disk 
)

Definition at line 961 of file XrdPfc.cc.

962 {
963  if (file_size == 0 || bytes_on_disk >= file_size)
964  return true;
965 
966  double frac_on_disk = (double) bytes_on_disk / file_size;
967 
968  if (file_size <= m_configuration.m_onlyIfCachedMinSize)
969  {
970  if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
971  return true;
972  }
973  else
974  {
975  if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
976  frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
977  return true;
978  }
979  return false;
980 }

References XrdPfc::Configuration::m_onlyIfCachedMinFrac, and XrdPfc::Configuration::m_onlyIfCachedMinSize.

Referenced by ConsiderCached(), and Stat().

+ Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File file)

Definition at line 713 of file XrdPfc.cc.

714 {
715  // Can be called with other locks held.
716 
717  if ( ! m_prefetch_enabled)
718  {
719  return;
720  }
721 
722  m_prefetch_condVar.Lock();
723  for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
724  {
725  if (*it == file)
726  {
727  m_prefetchList.erase(it);
728  break;
729  }
730  }
731  m_prefetch_condVar.UnLock();
732 }

References XrdSysCondVar::Lock(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ DetermineFullFileSize()

long long Cache::DetermineFullFileSize ( const std::string &  cinfo_fname)

Definition at line 928 of file XrdPfc.cc.

929 {
930  if (m_metaXattr) {
931  char pfn[4096];
932  m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
933  long long fsize = -1ll;
934  int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
935  if (res == sizeof(long long))
936  {
937  return fsize;
938  }
939  else
940  {
941  TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
942  }
943  }
944 
945  XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
946  XrdOucEnv env;
947  int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
948  if (res < 0)
949  return res;
950  Info info(m_trace, 0);
951  if ( ! info.Read(infoFile, cinfo_fname.c_str()))
952  return -EBADF;
953  return info.GetFileSize();
954 }
XrdSysXAttr * XrdSysXAttrActive
Definition: XrdSysFAttr.cc:61
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:873
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0

References Macaroons::Debug, XrdSysXAttr::Get(), XrdPfc::Info::GetFileSize(), XrdOss::Lfn2Pfn(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), TRACE, and XrdSysXAttrActive.

Referenced by ConsiderCached(), and Stat().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string &  command_url)

Definition at line 48 of file XrdPfcCommand.cc.

49 {
50  static const char *top_epfx = "ExecuteCommandUrl ";
51 
52  SplitParser cp(command_url, "/");
53 
54  std::string token = cp.get_token();
55 
56  if (token != "xrdpfc_command")
57  {
58  TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
59  return;
60  }
61 
62  // Get the command
63  token = cp.get_token();
64 
65 
66  //================================================================
67  // create_file
68  //================================================================
69 
70  if (token == "create_file")
71  {
72  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
73  static const char* usage =
74  "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
75  " Creates a cache file with given parameters. Data in file is random.\n"
76  " Useful for cache purge testing.\n"
77  "Notes:\n"
78  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
79  " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
80  " . -t and -d can be given multiple times to record several accesses.\n"
81  " . Negative arguments given to -t are interpreted as relative to now.\n";
82 
83  const Configuration &conf = m_configuration;
84 
85  token = cp.get_token();
86 
87  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
88 
89  std::vector<char*> argv;
90  SplitParser ap(token, " ");
91  int argc = ap.fill_argv(argv);
92 
93  long long file_size = ONE_GB;
94  long long block_size = conf.m_bufferSize;
95  int access_time [MAX_ACCESSES];
96  int access_duration[MAX_ACCESSES];
97  int at_count = 0, ad_count = 0;
98  XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
99  "help", 1, "h",
100  "verbose", 1, "v",
101  "size", 1, "s",
102  "blocksize", 1, "b",
103  "time", 1, "t",
104  "duration", 1, "d",
105  (const char *) 0);
106 
107  time_t time_now = time(0);
108 
109  Spec.Set(argc, &argv[0]);
110  char theOpt;
111 
112  while ((theOpt = Spec.getopt()) != (char) -1)
113  {
114  switch (theOpt)
115  {
116  case 'h': {
117  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
118  return;
119  }
120  case 's': {
121  if (XrdOuca2x::a2sz(m_log, "Error getting filesize", Spec.getarg(),
122  &file_size, 0ll, 32 * ONE_GB))
123  return;
124  break;
125  }
126  case 'b': {
127  if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", Spec.getarg(),
128  &block_size, 0ll, 64 * ONE_MB))
129  return;
130  break;
131  }
132  case 't': {
133  if (XrdOuca2x::a2i(m_log, "Error getting access time", Spec.getarg(),
134  &access_time[at_count++], INT_MIN, INT_MAX))
135  return;
136  break;
137  }
138  case 'd': {
139  if (XrdOuca2x::a2i(m_log, "Error getting access duration", Spec.getarg(),
140  &access_duration[ad_count++], 0, 24 * 3600))
141  return;
142  break;
143  }
144  default: {
145  TRACE(Error, err_prefix << "Unhandled command argument.");
146  return;
147  }
148  }
149  }
150  if (Spec.getarg())
151  {
152  TRACE(Error, err_prefix << "Options must take up all the arguments.");
153  return;
154  }
155 
156  if (at_count < 1) access_time [at_count++] = time_now - 10;
157  if (ad_count < 1) access_duration[ad_count++] = 10;
158 
159  if (at_count != ad_count)
160  {
161  TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
162  return;
163  }
164 
165  std::string file_path (cp.get_reminder_with_delim());
166  std::string cinfo_path(file_path + Info::s_infoExtension);
167 
168  TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
169 
170  // Check if cinfo exists ... bail out if it does.
171  {
172  struct stat infoStat;
173  if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
174  {
175  TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
176  return;
177  }
178  }
179 
180  TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
181 
182  {
183  const char *myUser = conf.m_username.c_str();
184  XrdOucEnv myEnv;
185 
186  // Create the data file.
187 
188  char size_str[32]; sprintf(size_str, "%lld", file_size);
189  myEnv.Put("oss.asize", size_str);
190  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
191  int cret;
192  if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
193  {
194  TRACE(Error, err_prefix << "Create failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
195  return;
196  }
197 
198  XrdOssDF *myFile = GetOss()->newFile(myUser);
199  if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
200  {
201  TRACE(Error, err_prefix << "Open failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
202  delete myFile;
203  return;
204  }
205 
206  // Create the info file.
207 
208  myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
209  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
210  if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
211  {
212  TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
213  myFile->Close(); delete myFile;
214  return;
215  }
216 
217  XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
218  if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
219  {
220  TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
221  delete myInfoFile;
222  myFile->Close(); delete myFile;
223  return;
224  }
225 
226  // Allocate space for the data file.
227 
228  if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
229  {
230  TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(cret));
231  }
232 
233  // Fill up cinfo.
234 
235  Info myInfo(m_trace, false);
236  myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
237  myInfo.SetAllBitsSynced();
238 
239  for (int i = 0; i < at_count; ++i)
240  {
241  time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
242 
243  myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
244  }
245 
246  myInfo.Write(myInfoFile, cinfo_path.c_str());
247 
248  myInfoFile->Close(); delete myInfoFile;
249  myFile->Close(); delete myFile;
250 
251  TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB.");
252 
253  {
254  XrdSysCondVarHelper lock(&m_writeQ.condVar);
255 
256  m_writeQ.writes_between_purges += file_size;
257  }
258  }
259  }
260 
261  //================================================================
262  // remove_file
263  //================================================================
264 
265  else if (token == "remove_file")
266  {
267  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
268  static const char* usage =
269  "Usage: remove_file/ [-h] /<path>\n"
270  " Removes given file from the cache unless it is currently open.\n"
271  " Useful for removal of stale files or duplicate files in a caching cluster.\n"
272  "Notes:\n"
273  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
274 
275  token = cp.get_token();
276 
277  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
278 
279  std::vector<char*> argv;
280  SplitParser ap(token, " ");
281  int argc = ap.fill_argv(argv);
282 
283  XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
284  "help", 1, "h",
285  (const char *) 0);
286 
287  Spec.Set(argc, &argv[0]);
288  char theOpt;
289 
290  while ((theOpt = Spec.getopt()) != (char) -1)
291  {
292  switch (theOpt)
293  {
294  case 'h': {
295  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
296  return;
297  }
298  default: {
299  TRACE(Error, err_prefix << "Unhandled command argument.");
300  return;
301  }
302  }
303  }
304  if (Spec.getarg())
305  {
306  TRACE(Error, err_prefix << "Options must take up all the arguments.");
307  return;
308  }
309 
310  std::string f_name(cp.get_reminder());
311 
312  TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
313 
314  int ret = UnlinkFile(f_name, true);
315 
316  TRACE(Info, err_prefix << "returned with status " << ret);
317  }
318 
319  //================================================================
320  // unknown command
321  //================================================================
322 
323  else
324  {
325  TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
326  }
327 }
void usage()
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
const int MAX_ACCESSES
const long long ONE_GB
const long long ONE_MB
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:46
bool Create
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition: XrdOss.hh:426
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:45
XrdOss * GetOss() const
Definition: XrdPfc.hh:389
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1105
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1172
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:56

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Macaroons::Debug, ERRNO_AND_ERRSTR, Macaroons::Error, XrdPfc::SplitParser::fill_argv(), XrdPfc::SplitParser::get_reminder(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdOucArgs::getarg(), XrdOssDF::getFD(), XrdOucArgs::getopt(), GetOss(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Configuration::m_username, MAX_ACCESSES, XrdOss::newFile(), ONE_GB, ONE_MB, XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::Info::s_infoExtension, XrdSysError::Say(), XrdOucArgs::Set(), XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), stat(), Stat(), TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

+ Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File f,
bool  high_debug 
)

Definition at line 568 of file XrdPfc.cc.

569 {
570  dec_ref_cnt(f, high_debug);
571 }

◆ GetFile()

File * Cache::GetFile ( const std::string &  path,
IO io,
long long  off = 0,
long long  filesize = 0 
)

Definition at line 415 of file XrdPfc.cc.

416 {
417  // Called from virtual IOFile constructor.
418 
419  TRACE(Debug, "GetFile " << path << ", io " << io);
420 
421  ActiveMap_i it;
422 
423  {
424  XrdSysCondVarHelper lock(&m_active_cond);
425 
426  while (true)
427  {
428  it = m_active.find(path);
429 
430  // File is not open or being opened. Mark it as being opened and
431  // proceed to opening it outside of while loop.
432  if (it == m_active.end())
433  {
434  it = m_active.insert(std::make_pair(path, (File*) 0)).first;
435  break;
436  }
437 
438  if (it->second != 0)
439  {
440  it->second->AddIO(io);
441  inc_ref_cnt(it->second, false, true);
442 
443  return it->second;
444  }
445  else
446  {
447  // Wait for some change in m_active, then recheck.
448  m_active_cond.Wait();
449  }
450  }
451  }
452 
453  // This is always true, now that IOFileBlock is unsupported.
454  if (filesize == 0)
455  {
456  struct stat st;
457  int res = io->Fstat(st);
458  if (res < 0) {
459  errno = res;
460  TRACE(Error, "GetFile, could not get valid stat");
461  } else if (res > 0) {
462  errno = ENOTSUP;
463  TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
464  } else {
465  filesize = st.st_size;
466  }
467  }
468 
469  File *file = 0;
470 
471  if (filesize >= 0)
472  {
473  file = File::FileOpen(path, off, filesize);
474  }
475 
476  {
477  XrdSysCondVarHelper lock(&m_active_cond);
478 
479  if (file)
480  {
481  inc_ref_cnt(file, false, true);
482  it->second = file;
483 
484  file->AddIO(io);
485  }
486  else
487  {
488  m_active.erase(it);
489  }
490 
491  m_active_cond.Broadcast();
492  }
493 
494  return file;
495 }
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:100
void AddIO(IO *io)
Definition: XrdPfcFile.cc:296

References XrdPfc::File::AddIO(), XrdSysCondVar::Broadcast(), Macaroons::Debug, Macaroons::Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), stat(), TRACE, and XrdSysCondVar::Wait().

Referenced by XrdPfc::IOFile::IOFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream* XrdPfc::Cache::GetGStream ( )
inline

Definition at line 404 of file XrdPfc.hh.

404 { return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 163 of file XrdPfc.cc.

163 { return *m_instance; }

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::File::GetLog(), XrdPfc::File::GetTrace(), XrdPfc::GetTrace(), PrefetchThread(), ProcessWriteTaskThread(), PurgeThread(), ResourceMonitorHeartBeatThread(), XrdPfc::File::Sync(), and XrdPfc::File::WriteBlockToDisk().

+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError* XrdPfc::Cache::GetLog ( )
inline

Definition at line 401 of file XrdPfc.hh.

401 { return &m_log; }

Referenced by XrdPfc::File::GetLog().

+ Here is the caller graph for this function:

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 735 of file XrdPfc.cc.

736 {
737  m_prefetch_condVar.Lock();
738  while (m_prefetchList.empty())
739  {
740  m_prefetch_condVar.Wait();
741  }
742 
743  // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
744 
745  size_t l = m_prefetchList.size();
746  int idx = rand() % l;
747  File* f = m_prefetchList[idx];
748 
749  m_prefetch_condVar.UnLock();
750  return f;
751 }

References XrdSysCondVar::Lock(), XrdSysCondVar::UnLock(), and XrdSysCondVar::Wait().

Referenced by Prefetch().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetOss()

XrdOss* XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 389 of file XrdPfc.hh.

389 { return m_oss; }

Referenced by ExecuteCommandUrl().

+ Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace* XrdPfc::Cache::GetTrace ( )
inline

Definition at line 402 of file XrdPfc.hh.

402 { return m_trace; }

Referenced by XrdPfc::File::GetTrace(), XrdPfc::IO::GetTrace(), and XrdPfc::GetTrace().

+ Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string &  path)

Definition at line 684 of file XrdPfc.cc.

685 {
686  XrdSysCondVarHelper lock(&m_active_cond);
687 
688  return m_active.find(path) != m_active.end() ||
689  m_purge_delay_set.find(path) != m_purge_delay_set.end();
690 }

Referenced by Purge().

+ Here is the caller graph for this function:

◆ LocalFilePath()

int Cache::LocalFilePath ( const char *  curl,
char *  buff = 0,
int  blen = 0,
LFP_Reason  why = ForAccess,
bool  forall = false 
)
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 796 of file XrdPfc.cc.

798 {
799  static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
800  static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
801  static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
802 
803  TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
804 
805  if (buff && blen > 0) buff[0] = 0;
806 
807  XrdCl::URL url(curl);
808  std::string f_name = url.GetPath();
809  std::string i_name = f_name + Info::s_infoExtension;
810 
811  if (why == ForPath)
812  {
813  int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
814  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
815  return ret;
816  }
817 
818  {
819  XrdSysCondVarHelper lock(&m_active_cond);
820  m_purge_delay_set.insert(f_name);
821  }
822 
823  struct stat sbuff, sbuff2;
824  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
825  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
826  {
827  if (S_ISDIR(sbuff.st_mode))
828  {
829  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
830  return -EISDIR;
831  }
832  else
833  {
834  bool read_ok = false;
835  bool is_complete = false;
836 
837  // Lock and check if the file is active. If NOT, keep the lock
838  // and add dummy access after successful reading of info file.
839  // If it IS active, just release the lock, this ongoing access will
840  // assure the file continues to exist.
841 
842  // XXXX How can I just loop over the cinfo file when active?
843  // Can I not get is_complete from the existing file?
844  // Do I still want to inject access record?
845  // Oh, it writes only if not active .... still let's try to use existing File.
846 
847  m_active_cond.Lock();
848 
849  bool is_active = m_active.find(f_name) != m_active.end();
850 
851  if (is_active) m_active_cond.UnLock();
852 
853  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
854  XrdOucEnv myEnv;
855  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
856  if (res >= 0)
857  {
858  Info info(m_trace, 0);
859  if (info.Read(infoFile, i_name.c_str()))
860  {
861  read_ok = true;
862 
863  is_complete = info.IsComplete();
864 
865  // Add full-size access if reason is for access.
866  if ( ! is_active && is_complete && why == ForAccess)
867  {
868  info.WriteIOStatSingle(info.GetFileSize());
869  info.Write(infoFile, i_name.c_str());
870  }
871  }
872  infoFile->Close();
873  }
874  delete infoFile;
875 
876  if ( ! is_active) m_active_cond.UnLock();
877 
878  if (read_ok)
879  {
880  if ((is_complete || why == ForInfo) && buff != 0)
881  {
882  int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
883  if (res2 < 0)
884  return res2;
885 
886  // Normally, files are owned by us but when direct cache access
887  // is wanted and possible, make sure the file is world readable.
888  if (why == ForAccess)
889  {mode_t mode = (forall ? worldReadable : groupReadable);
890  if (((sbuff.st_mode & worldReadable) != mode)
891  && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
892  {is_complete = false;
893  *buff = 0;
894  }
895  }
896  }
897 
898  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
899  (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
900 
901  return is_complete ? 0 : -EREMOTE;
902  }
903  }
904  }
905 
906  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
907  return -ENOENT;
908 }
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0

References XrdOss::Chmod(), XrdOssDF::Close(), Macaroons::Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOss::Lfn2Pfn(), XrdSysCondVar::Lock(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat(), XrdOss::Stat(), TRACE, XrdSysCondVar::UnLock(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

+ Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 754 of file XrdPfc.cc.

755 {
756  const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
757 
758  while (true)
759  {
760  m_RAM_mutex.Lock();
761  bool doPrefetch = (m_RAM_used < limit_RAM);
762  m_RAM_mutex.UnLock();
763 
764  if (doPrefetch)
765  {
767  f->Prefetch();
768  }
769  else
770  {
772  }
773  }
774 }
File * GetNextFileToPrefetch()
Definition: XrdPfc.cc:735
void Prefetch()
Definition: XrdPfcFile.cc:1462
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227

References GetNextFileToPrefetch(), XrdSysMutex::Lock(), XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::File::Prefetch(), XrdSysMutex::UnLock(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Prepare()

int Cache::Prepare ( const char *  curl,
int  oflags,
mode_t  mode 
)
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1054 of file XrdPfc.cc.

1055 {
1056  XrdCl::URL url(curl);
1057  std::string f_name = url.GetPath();
1058  std::string i_name = f_name + Info::s_infoExtension;
1059 
1060  // Do not allow write access.
1061  if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1062  {
1063  TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1064  return -EROFS;
1065  }
1066 
1067  // Intercept xrdpfc_command requests.
1068  if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1069  {
1070  // Schedule a job to process command request.
1071  {
1072  CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1073 
1074  schedP->Schedule(ce);
1075  }
1076 
1077  return -EAGAIN;
1078  }
1079 
1080  {
1081  XrdSysCondVarHelper lock(&m_active_cond);
1082  m_purge_delay_set.insert(f_name);
1083  }
1084 
1085  struct stat sbuff;
1086  if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1087  {
1088  TRACE(Dump, "Prepare defer open " << f_name);
1089  return 1;
1090  }
1091  else
1092  {
1093  return 0;
1094  }
1095 }
@ Warning
static XrdScheduler * schedP
Definition: XrdPfc.hh:408
void Schedule(XrdJob *jp)

References XrdCl::URL::GetPath(), XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Info::s_infoExtension, schedP, XrdScheduler::Schedule(), stat(), XrdOss::Stat(), TRACE, Warning, and XrdOssOK.

+ Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 308 of file XrdPfc.cc.

309 {
310  std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
311 
312  while (true)
313  {
314  m_writeQ.condVar.Lock();
315  while (m_writeQ.size == 0)
316  {
317  m_writeQ.condVar.Wait();
318  }
319 
320  // MT -- optimize to pop several blocks if they are available (or swap the list).
321  // This makes sense especially for smallish block sizes.
322 
323  int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
324  long long sum_size = 0;
325 
326  for (int bi = 0; bi < n_pushed; ++bi)
327  {
328  Block* block = m_writeQ.queue.front();
329  m_writeQ.queue.pop_front();
330  m_writeQ.writes_between_purges += block->get_size();
331  sum_size += block->get_size();
332 
333  blks_to_write[bi] = block;
334 
335  TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
336  }
337  m_writeQ.size -= n_pushed;
338 
339  m_writeQ.condVar.UnLock();
340 
341  {
342  XrdSysMutexHelper lock(&m_RAM_mutex);
343  m_RAM_write_queue -= sum_size;
344  }
345 
346  for (int bi = 0; bi < n_pushed; ++bi)
347  {
348  Block* block = blks_to_write[bi];
349 
350  block->m_file->WriteBlockToDisk(block);
351  }
352  }
353 }
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1447
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:987

References XrdPfc::Block::get_size(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, XrdPfc::Configuration::m_wqueue_blocks, TRACE, and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Purge()

void XrdPfc::Cache::Purge ( )

Thread function invoked to scan and purge files from disk when needed.

Definition at line 698 of file XrdPfcPurge.cc.

699 {
700  static const char *trc_pfx = "Purge() ";
701 
702  XrdOucEnv env;
703  long long disk_usage;
704  long long estimated_file_usage = m_configuration.m_diskUsageHWM;
705 
706  // Pause before initial run
707  sleep(1);
708 
709  m_fs_state = new DataFsState;
710 
711  // { PathTokenizer p("/a/b/c/f.root", 2, true); p.deboog(); }
712  // { PathTokenizer p("/a/b/f.root", 2, true); p.deboog(); }
713  // { PathTokenizer p("/a/f.root", 2, true); p.deboog(); }
714  // { PathTokenizer p("/f.root", 2, true); p.deboog(); }
715 
716  int age_based_purge_countdown = 0; // enforce on first purge loop entry.
717  bool is_first = true;
718 
719  while (true)
720  {
721  time_t purge_start = time(0);
722 
723  {
724  XrdSysCondVarHelper lock(&m_active_cond);
725 
726  m_in_purge = true;
727  }
728 
729  TRACE(Info, trc_pfx << "Started.");
730 
731  // Bytes to remove based on total disk usage (d) and file usage (f).
732  long long bytesToRemove_d = 0, bytesToRemove_f = 0;
733 
734  // get amount of space to potentially erase based on total disk usage
735  XrdOssVSInfo sP; // Make sure we start when a clean slate in each loop
736  if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
737  {
738  TRACE(Error, trc_pfx << "can't get StatVS for oss space " << m_configuration.m_data_space);
739  continue;
740  }
741  else
742  {
743  disk_usage = sP.Total - sP.Free;
744  TRACE(Debug, trc_pfx << "used disk space " << disk_usage << " bytes.");
745 
746  if (disk_usage > m_configuration.m_diskUsageHWM)
747  {
748  bytesToRemove_d = disk_usage - m_configuration.m_diskUsageLWM;
749  }
750  }
751 
752  // estimate amount of space to erase based on file usage
753  if (m_configuration.are_file_usage_limits_set())
754  {
755  long long estimated_writes_since_last_purge;
756  {
757  XrdSysCondVarHelper lock(&m_writeQ.condVar);
758 
759  estimated_writes_since_last_purge = m_writeQ.writes_between_purges;
760  m_writeQ.writes_between_purges = 0;
761  }
762  estimated_file_usage += estimated_writes_since_last_purge;
763 
764  TRACE(Debug, trc_pfx << "estimated usage by files " << estimated_file_usage << " bytes.");
765 
766  bytesToRemove_f = std::max(estimated_file_usage - m_configuration.m_fileUsageNominal, 0ll);
767 
768  // Here we estimate fractional usages -- to decide if full scan is necessary before actual purge.
769  double frac_du = 0, frac_fu = 0;
770  m_configuration.calculate_fractional_usages(disk_usage, estimated_file_usage, frac_du, frac_fu);
771 
772  if (frac_fu > 1.0 - frac_du)
773  {
774  bytesToRemove_f = std::max(bytesToRemove_f, disk_usage - m_configuration.m_diskUsageLWM);
775  }
776  }
777 
778  long long bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
779 
780  bool enforce_age_based_purge = false;
781  if (m_configuration.is_age_based_purge_in_effect() || m_configuration.is_uvkeep_purge_in_effect())
782  {
783  // XXXX ... I could collect those guys in larger vectors (maps?) and do traversal when
784  // they are empty.
785  if (--age_based_purge_countdown <= 0)
786  {
787  enforce_age_based_purge = true;
788  age_based_purge_countdown = m_configuration.m_purgeAgeBasedPeriod;
789  }
790  }
791 
792  bool enforce_traversal_for_usage_collection = is_first;
793  // XXX Other conditions? Periodic checks?
794 
795  copy_out_active_stats_and_update_data_fs_state();
796 
797  TRACE(Debug, trc_pfx << "Precheck:");
798  TRACE(Debug, "\tbytes_to_remove_disk = " << bytesToRemove_d << " B");
799  TRACE(Debug, "\tbytes_to remove_files = " << bytesToRemove_f << " B (" << (is_first ? "max possible for initial run" : "estimated") << ")");
800  TRACE(Debug, "\tbytes_to_remove = " << bytesToRemove << " B");
801  TRACE(Debug, "\tenforce_age_based_purge = " << enforce_age_based_purge);
802  is_first = false;
803 
804  long long bytesToRemove_at_start = 0; // set after file scan
805  int deleted_file_count = 0;
806 
807  bool purge_required = (bytesToRemove > 0 || enforce_age_based_purge);
808 
809  // XXXX-PurgeOpt Need to retain this state between purges so I can avoid doing
810  // the traversal more often than really needed.
811  FPurgeState purgeState(2 * bytesToRemove, *m_oss); // prepare twice more volume than required
812 
813  if (purge_required || enforce_traversal_for_usage_collection)
814  {
815  // Make a sorted map of file paths sorted by access time.
816 
817  if (m_configuration.is_age_based_purge_in_effect())
818  {
819  purgeState.setMinTime(time(0) - m_configuration.m_purgeColdFilesAge);
820  }
821  if (m_configuration.is_uvkeep_purge_in_effect())
822  {
823  purgeState.setUVKeepMinTime(time(0) - m_configuration.m_cs_UVKeep);
824  }
825 
826  XrdOssDF* dh = m_oss->newDir(m_configuration.m_username.c_str());
827  if (dh->Opendir("/", env) == XrdOssOK)
828  {
829  purgeState.begin_traversal(m_fs_state->get_root());
830 
831  purgeState.TraverseNamespace(dh);
832 
833  purgeState.end_traversal();
834 
835  dh->Close();
836  }
837  delete dh; dh = 0;
838 
839  estimated_file_usage = purgeState.getNBytesTotal();
840 
841  TRACE(Debug, trc_pfx << "actual usage by files " << estimated_file_usage << " bytes.");
842 
843  // Adjust bytesToRemove_f and then bytesToRemove based on actual file usage,
844  // possibly retreating below nominal file usage (but not below baseline file usage).
845  if (m_configuration.are_file_usage_limits_set())
846  {
847  bytesToRemove_f = std::max(estimated_file_usage - m_configuration.m_fileUsageNominal, 0ll);
848 
849  double frac_du = 0, frac_fu = 0;
850  m_configuration.calculate_fractional_usages(disk_usage, estimated_file_usage, frac_du, frac_fu);
851 
852  if (frac_fu > 1.0 - frac_du)
853  {
854  bytesToRemove = std::max(bytesToRemove_f, disk_usage - m_configuration.m_diskUsageLWM);
855  bytesToRemove = std::min(bytesToRemove, estimated_file_usage - m_configuration.m_fileUsageBaseline);
856  }
857  else
858  {
859  bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
860  }
861  }
862  else
863  {
864  bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
865  }
866  bytesToRemove_at_start = bytesToRemove;
867 
868  TRACE(Debug, trc_pfx << "After scan:");
869  TRACE(Debug, "\tbytes_to_remove_disk = " << bytesToRemove_d << " B");
870  TRACE(Debug, "\tbytes_to remove_files = " << bytesToRemove_f << " B (measured)");
871  TRACE(Debug, "\tbytes_to_remove = " << bytesToRemove << " B");
872  TRACE(Debug, "\tenforce_age_based_purge = " << enforce_age_based_purge);
873  TRACE(Debug, "\tmin_time = " << purgeState.getMinTime());
874 
875  if (enforce_age_based_purge)
876  {
877  purgeState.MoveListEntriesToMap();
878  }
879  }
880 
881  // Dump statistcs before actual purging so maximum usage values get recorded.
882  // Should really go to gstream --- and should really go from Heartbeat.
883  if (m_configuration.is_dir_stat_reporting_on())
884  {
885  m_fs_state->dump_recursively();
886  }
887 
888  if (purge_required)
889  {
890  // Loop over map and remove files with oldest values of access time.
891  struct stat fstat;
892  size_t info_ext_len = strlen(Info::s_infoExtension);
893  int protected_cnt = 0;
894  long long protected_sum = 0;
895  for (FPurgeState::map_i it = purgeState.m_fmap.begin(); it != purgeState.m_fmap.end(); ++it)
896  {
897  // Finish when enough space has been freed but not while age-based purging is in progress.
898  // Those files are marked with time-stamp = 0.
899  if (bytesToRemove <= 0 && ! (enforce_age_based_purge && it->first == 0))
900  {
901  break;
902  }
903 
904  std::string &infoPath = it->second.path;
905  std::string dataPath = infoPath.substr(0, infoPath.size() - info_ext_len);
906 
907  if (IsFileActiveOrPurgeProtected(dataPath))
908  {
909  ++protected_cnt;
910  protected_sum += it->second.nBytes;
911  TRACE(Debug, trc_pfx << "File is active or purge-protected: " << dataPath << " size: " << it->second.nBytes);
912  continue;
913  }
914 
915  // remove info file
916  if (m_oss->Stat(infoPath.c_str(), &fstat) == XrdOssOK)
917  {
918  // cinfo file can be on another oss.space, do not subtract for now.
919  // Could be relevant for very small block sizes.
920  // bytesToRemove -= fstat.st_size;
921  // estimated_file_usage -= fstat.st_size;
922  // ++deleted_file_count;
923 
924  m_oss->Unlink(infoPath.c_str());
925  TRACE(Dump, trc_pfx << "Removed file: '" << infoPath << "' size: " << fstat.st_size);
926  }
927 
928  // remove data file
929  if (m_oss->Stat(dataPath.c_str(), &fstat) == XrdOssOK)
930  {
931  bytesToRemove -= it->second.nBytes;
932  estimated_file_usage -= it->second.nBytes;
933  ++deleted_file_count;
934 
935  m_oss->Unlink(dataPath.c_str());
936  TRACE(Dump, trc_pfx << "Removed file: '" << dataPath << "' size: " << it->second.nBytes << ", time: " << it->first);
937 
938  if (it->second.dirState != 0) // XXXX This should now always be true.
939  it->second.dirState->add_usage_purged(it->second.nBytes);
940  else
941  TRACE(Error, trc_pfx << "DirState not set for file '" << dataPath << "'.");
942  }
943  }
944  if (protected_cnt > 0)
945  {
946  TRACE(Info, trc_pfx << "Encountered " << protected_cnt << " protected files, sum of their size: " << protected_sum);
947  }
948 
949  m_fs_state->upward_propagate_usage_purged();
950  }
951 
952  {
953  XrdSysCondVarHelper lock(&m_active_cond);
954 
955  m_purge_delay_set.clear();
956  m_in_purge = false;
957  }
958 
959  int purge_duration = time(0) - purge_start;
960 
961  TRACE(Info, trc_pfx << "Finished, removed " << deleted_file_count << " data files, total size " <<
962  bytesToRemove_at_start - bytesToRemove << ", bytes to remove at end " << bytesToRemove << ", purge duration " << purge_duration);
963 
964  int sleep_time = m_configuration.m_purgeInterval - purge_duration;
965  if (sleep_time > 0)
966  {
967  sleep(sleep_time);
968  }
969  }
970 }
int fstat(int fildes, struct stat *buf)
virtual int Opendir(const char *path, XrdOucEnv &env)
Definition: XrdOss.hh:79
long long Free
Definition: XrdOssVS.hh:91
virtual XrdOssDF * newDir(const char *tident)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
bool IsFileActiveOrPurgeProtected(const std::string &)
Definition: XrdPfc.cc:684
DirState * get_root()
Definition: XrdPfcPurge.cc:193
void upward_propagate_usage_purged()
Definition: XrdPfcPurge.cc:202
map_t::iterator map_i
Definition: XrdPfcPurge.cc:238
bool is_uvkeep_purge_in_effect() const
Definition: XrdPfc.hh:61
bool are_file_usage_limits_set() const
Definition: XrdPfc.hh:59
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition: XrdPfc.hh:93
void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu)
Definition: XrdPfc.cc:135
bool is_age_based_purge_in_effect() const
Definition: XrdPfc.hh:60

References XrdPfc::Configuration::are_file_usage_limits_set(), XrdPfc::FPurgeState::begin_traversal(), XrdPfc::Configuration::calculate_fractional_usages(), XrdOssDF::Close(), Macaroons::Debug, XrdPfc::DataFsState::dump_recursively(), XrdPfc::FPurgeState::end_traversal(), Macaroons::Error, XrdOssVSInfo::Free, fstat(), XrdPfc::DataFsState::get_root(), XrdPfc::FPurgeState::getMinTime(), XrdPfc::FPurgeState::getNBytesTotal(), XrdPfc::Configuration::is_age_based_purge_in_effect(), XrdPfc::Configuration::is_dir_stat_reporting_on(), XrdPfc::Configuration::is_uvkeep_purge_in_effect(), IsFileActiveOrPurgeProtected(), XrdPfc::Configuration::m_cs_UVKeep, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_diskUsageHWM, XrdPfc::Configuration::m_diskUsageLWM, XrdPfc::Configuration::m_fileUsageBaseline, XrdPfc::Configuration::m_fileUsageNominal, XrdPfc::FPurgeState::m_fmap, XrdPfc::Configuration::m_purgeAgeBasedPeriod, XrdPfc::Configuration::m_purgeColdFilesAge, XrdPfc::Configuration::m_purgeInterval, XrdPfc::Configuration::m_username, XrdPfc::FPurgeState::MoveListEntriesToMap(), XrdOss::newDir(), XrdOssDF::Opendir(), XrdPfc::Info::s_infoExtension, XrdPfc::FPurgeState::setMinTime(), XrdPfc::FPurgeState::setUVKeepMinTime(), stat(), XrdOss::Stat(), XrdOss::StatVS(), XrdOssVSInfo::Total, TRACE, XrdPfc::FPurgeState::TraverseNamespace(), XrdOss::Unlink(), XrdPfc::DataFsState::upward_propagate_usage_purged(), and XrdOssOK.

Referenced by PurgeThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration& XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 319 of file XrdPfc.hh.

319 { return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), Conf(), XrdPfc::File::WriteBlockToDisk(), and XrdOucGetCache().

+ Here is the caller graph for this function:

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File file)

Definition at line 697 of file XrdPfc.cc.

698 {
699  // Can be called with other locks held.
700 
701  if ( ! m_prefetch_enabled)
702  {
703  return;
704  }
705 
706  m_prefetch_condVar.Lock();
707  m_prefetchList.push_back(file);
708  m_prefetch_condVar.Signal();
709  m_prefetch_condVar.UnLock();
710 }

References XrdSysCondVar::Lock(), XrdSysCondVar::Signal(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ ReleaseFile()

void Cache::ReleaseFile ( File f,
IO io 
)

Definition at line 497 of file XrdPfc.cc.

498 {
499  // Called from virtual IO::DetachFinalize.
500 
501  TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
502 
503  {
504  XrdSysCondVarHelper lock(&m_active_cond);
505 
506  f->RemoveIO(io);
507  }
508  dec_ref_cnt(f, true);
509 }
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:333

References Macaroons::Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize(), and XrdPfc::IOFileBlock::DetachFinalize().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char *  buf,
long long  size 
)

Definition at line 397 of file XrdPfc.cc.

398 {
399  bool std_size = (size == m_configuration.m_bufferSize);
400  {
401  XrdSysMutexHelper lock(&m_RAM_mutex);
402 
403  m_RAM_used -= size;
404 
405  if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
406  {
407  m_RAM_std_blocks.push_back(buf);
408  ++m_RAM_std_size;
409  return;
410  }
411  }
412  free(buf);
413 }

References XrdPfc::Configuration::m_bufferSize, and XrdPfc::Configuration::m_RamKeepStdBlocks.

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 275 of file XrdPfc.cc.

276 {
277  std::list<Block*> removed_blocks;
278  long long sum_size = 0;
279 
280  m_writeQ.condVar.Lock();
281  std::list<Block*>::iterator i = m_writeQ.queue.begin();
282  while (i != m_writeQ.queue.end())
283  {
284  if ((*i)->m_file == file)
285  {
286  TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
287  std::list<Block*>::iterator j = i++;
288  removed_blocks.push_back(*j);
289  sum_size += (*j)->get_size();
290  m_writeQ.queue.erase(j);
291  --m_writeQ.size;
292  }
293  else
294  {
295  ++i;
296  }
297  }
298  m_writeQ.condVar.UnLock();
299 
300  {
301  XrdSysMutexHelper lock(&m_RAM_mutex);
302  m_RAM_write_queue -= sum_size;
303  }
304 
305  file->BlocksRemovedFromWriteQ(removed_blocks);
306 }

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdPfc::File::lPath(), and TRACE.

Referenced by UnlinkFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long  size)

Definition at line 357 of file XrdPfc.cc.

358 {
359  static const size_t s_block_align = sysconf(_SC_PAGESIZE);
360 
361  bool std_size = (size == m_configuration.m_bufferSize);
362 
363  m_RAM_mutex.Lock();
364 
365  long long total = m_RAM_used + size;
366 
367  if (total <= m_configuration.m_RamAbsAvailable)
368  {
369  m_RAM_used = total;
370  if (std_size && m_RAM_std_size > 0)
371  {
372  char *buf = m_RAM_std_blocks.back();
373  m_RAM_std_blocks.pop_back();
374  --m_RAM_std_size;
375 
376  m_RAM_mutex.UnLock();
377 
378  return buf;
379  }
380  else
381  {
382  m_RAM_mutex.UnLock();
383  char *buf;
384  if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
385  {
386  // Report out of mem? Probably should report it at least the first time,
387  // then periodically.
388  return 0;
389  }
390  return buf;
391  }
392  }
393  m_RAM_mutex.UnLock();
394  return 0;
395 }

References XrdSysMutex::Lock(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_RamAbsAvailable, and XrdSysMutex::UnLock().

+ Here is the call graph for this function:

◆ ResourceMonitorHeartBeat()

void XrdPfc::Cache::ResourceMonitorHeartBeat ( )

Thread function checking resource usage periodically.

Definition at line 606 of file XrdPfcPurge.cc.

607 {
608  // static const char *trc_pfx = "ResourceMonitorHeartBeat() ";
609 
610  // Pause before initial run
611  sleep(1);
612 
613  // XXXX Setup initial / constant stats (total RAM, total disk, ???)
614 
617 
618  S.Lock();
619 
620  X.DiskSize = m_configuration.m_diskTotalSpace;
621 
622  X.MemSize = m_configuration.m_RamAbsAvailable;
623 
624  S.UnLock();
625 
626  // XXXX Schedule initial disk scan, time it!
627  //
628  // TRACE(Info, trc_pfx << "scheduling intial disk scan.");
629  // schedP->Schedule( new ScanAndPurgeJob("XrdPfc::ScanAndPurge") );
630  //
631  // bool scan_and_purge_running = true;
632 
633  // XXXX Could we really hold last-usage for all files in memory?
634 
635  // XXXX Think how to handle disk-full, scan/purge not finishing:
636  // - start dropping things out of write queue, but only when RAM gets near full;
637  // - monitoring this then becomes a high-priority job, inner loop with sleep of,
638  // say, 5 or 10 seconds.
639 
640  while (true)
641  {
642  time_t heartbeat_start = time(0);
643 
644  // TRACE(Info, trc_pfx << "HeartBeat starting ...");
645 
646  // if sumary monitoring configured, pupulate OucCacheStats:
647  S.Lock();
648 
649  // - available / used disk space (files usage calculated elsewhere (maybe))
650 
651  // - RAM usage
652  { XrdSysMutexHelper lck(&m_RAM_mutex);
653  X.MemUsed = m_RAM_used;
654  X.MemWriteQ = m_RAM_write_queue;
655  }
656  // - files opened / closed etc
657 
658  // do estimate of available space
659  S.UnLock();
660 
661  // if needed, schedule purge in a different thread.
662  // purge is:
663  // - deep scan + gather FSPurgeState
664  // - actual purge
665  //
666  // this thread can continue running and, if needed, stop writing to disk
667  // if purge is taking too long.
668 
669  // think how data is passed / synchronized between this and purge thread
670 
671  // !!!! think how stat collection is done and propgated upwards;
672  // until now it was done once per purge-interval.
673  // now stats will be added up more often, but purge will be done
674  // only occasionally.
675  // also, do we report cumulative values or deltas? cumulative should
676  // be easier and consistent with summary data.
677  // still, some are state - like disk usage, num of files.
678 
679  // Do we take care of directories that need to be newly added into DirState hierarchy?
680  // I.e., when user creates new directories and these are covered by either full
681  // spec or by root + depth declaration.
682 
683  int heartbeat_duration = time(0) - heartbeat_start;
684 
685  // TRACE(Info, trc_pfx << "HeartBeat finished, heartbeat_duration " << heartbeat_duration);
686 
687  // int sleep_time = m_configuration.m_purgeInterval - heartbeat_duration;
688  int sleep_time = 60 - heartbeat_duration;
689  if (sleep_time > 0)
690  {
691  sleep(sleep_time);
692  }
693  }
694 }
struct XrdOucCacheStats::CacheStats X
XrdOucCacheStats Statistics
Definition: XrdOucCache.hh:686

References XrdOucCacheStats::CacheStats::DiskSize, XrdOucCacheStats::Lock(), XrdPfc::Configuration::m_diskTotalSpace, XrdPfc::Configuration::m_RamAbsAvailable, XrdOucCacheStats::CacheStats::MemSize, XrdOucCacheStats::CacheStats::MemUsed, XrdOucCacheStats::CacheStats::MemWriteQ, XrdOucCache::Statistics, XrdOucCacheStats::UnLock(), and XrdOucCacheStats::X.

Referenced by ResourceMonitorHeartBeatThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File f)
inline

Definition at line 397 of file XrdPfc.hh.

397 { schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char *  curl,
struct stat sbuff 
)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1105 of file XrdPfc.cc.

1106 {
1107  const char *tpfx = "Stat ";
1108 
1109  XrdCl::URL url(curl);
1110  std::string f_name = url.GetPath();
1111 
1112  File *file = nullptr;
1113  {
1114  XrdSysCondVarHelper lock(&m_active_cond);
1115  auto it = m_active.find(f_name);
1116  if (it != m_active.end()) {
1117  file = it->second;
1118  inc_ref_cnt(file, false, false);
1119  }
1120  }
1121  if (file) {
1122  int res = file->Fstat(sbuff);
1123  dec_ref_cnt(file, false);
1124  TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1125  return res;
1126  }
1127 
1128  int res = m_oss->Stat(f_name.c_str(), &sbuff);
1129  if (res != XrdOssOK) {
1130  TRACE(Debug, tpfx << curl << " -> " << res);
1131  return 1; // res; -- for only-if-cached
1132  }
1133  if (S_ISDIR(sbuff.st_mode))
1134  {
1135  TRACE(Debug, tpfx << curl << " -> EISDIR");
1136  return -EISDIR;
1137  }
1138 
1139  long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1140  if (file_size < 0) {
1141  TRACE(Debug, tpfx << curl << " -> " << file_size);
1142  return 1; // (int) file_size; -- for only-if-cached
1143  }
1144  sbuff.st_size = file_size;
1145  bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1146  if ( ! is_cached) {
1147  sbuff.st_ctime = sbuff.st_mtime = sbuff.st_atime = 0;
1148  }
1149 
1150  TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1151 
1152  return 0;
1153 }

References Macaroons::Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, XrdOss::Stat(), TRACE, and XrdOssOK.

Referenced by ExecuteCommandUrl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 164 of file XrdPfc.cc.

164 { return *m_instance; }

◆ Unlink()

int Cache::Unlink ( const char *  curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1162 of file XrdPfc.cc.

1163 {
1164  XrdCl::URL url(curl);
1165  std::string f_name = url.GetPath();
1166 
1167  // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1168 
1169  return UnlinkFile(f_name, false);
1170 }

References XrdCl::URL::GetPath(), and UnlinkFile().

+ Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string &  f_name,
bool  fail_if_open 
)

Remove cinfo and data files from cache.

Definition at line 1172 of file XrdPfc.cc.

1173 {
1174  ActiveMap_i it;
1175  File *file = 0;
1176  {
1177  XrdSysCondVarHelper lock(&m_active_cond);
1178 
1179  it = m_active.find(f_name);
1180 
1181  if (it != m_active.end())
1182  {
1183  if (fail_if_open)
1184  {
1185  TRACE(Info, "UnlinkCommon " << f_name << ", file currently open and force not requested - denying request");
1186  return -EBUSY;
1187  }
1188 
1189  // Null File* in m_active map means an operation is ongoing, probably
1190  // Attach() with possible File::Open(). Ask for retry.
1191  if (it->second == 0)
1192  {
1193  TRACE(Info, "UnlinkCommon " << f_name << ", an operation on this file is ongoing - denying request");
1194  return -EAGAIN;
1195  }
1196 
1197  file = it->second;
1199  it->second = 0;
1200  }
1201  else
1202  {
1203  it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1204  }
1205  }
1206 
1207  if (file)
1208  {
1209  RemoveWriteQEntriesFor(file);
1210  }
1211 
1212  std::string i_name = f_name + Info::s_infoExtension;
1213 
1214  // Unlink file & cinfo
1215  int f_ret = m_oss->Unlink(f_name.c_str());
1216  int i_ret = m_oss->Unlink(i_name.c_str());
1217 
1218  TRACE(Debug, "UnlinkCommon " << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1219 
1220  {
1221  XrdSysCondVarHelper lock(&m_active_cond);
1222 
1223  m_active.erase(it);
1224  }
1225 
1226  return std::min(f_ret, i_ret);
1227 }
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition: XrdPfc.cc:275
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:113

References Macaroons::Debug, XrdPfc::File::initiate_emergency_shutdown(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, TRACE, and XrdOss::Unlink().

Referenced by ExecuteCommandUrl(), XrdPfcFSctl::FSctl(), XrdPfc::File::Sync(), and Unlink().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ VCheck()

static bool XrdPfc::Cache::VCheck ( XrdVersionInfo &  urVersion)
inlinestatic

Version check.

Definition at line 346 of file XrdPfc.hh.

346 { return true; }

◆ WriteFileSizeXAttr()

void Cache::WriteFileSizeXAttr ( int  cinfo_fd,
long long  file_size 
)

Definition at line 913 of file XrdPfc.cc.

914 {
915  if (m_metaXattr) {
916  int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
917  if (res != 0) {
918  TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
919  }
920  }
921 }
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0

References Macaroons::Debug, XrdSysXAttr::Set(), TRACE, and XrdSysXAttrActive.

+ Here is the call graph for this function:

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = 0
static

Definition at line 408 of file XrdPfc.hh.

Referenced by XrdPfc::IO::Detach(), Prepare(), and XrdOucGetCache().


The documentation for this class was generated from the following files: