22 #include <sys/statvfs.h>
47 Cache * Cache::m_instance = 0;
81 const char *config_filename,
82 const char *parameters,
86 err.
Say(
"++++++ Proxy file cache initialization started.");
97 if (! instance.
Config(config_filename, parameters))
99 err.
Say(
"Config Proxy file cache initialization failed.");
102 err.
Say(
"------ Proxy file cache initialization completed.");
107 for (
int wti = 0; wti < instance.
RefConfiguration().m_wqueue_threads; ++wti)
123 env->
PutPtr(
"XrdFSCtl_PC*", pfcFSctl);
132 double &frac_du,
double &frac_fu)
146 frac_du = std::min( std::max( frac_du, 0.0), 1.0 );
147 frac_fu = std::min( std::max( frac_fu, 0.0), 1.0 );
154 assert (m_instance == 0);
155 m_instance =
new Cache(logger, env);
165 if (! m_decisionpoints.empty())
168 std::string filename = url.
GetPath();
169 std::vector<Decision*>::const_iterator it;
170 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
174 if (! d->
Decide(filename, *m_oss))
187 m_log(logger,
"XrdPfc_"),
192 m_prefetch_condVar(0),
193 m_prefetch_enabled(false),
195 m_RAM_write_queue(0),
200 m_stats_n_purge_cond(0),
202 m_last_scan_duration(0),
203 m_last_purge_duration(0),
204 m_spt_state(SPTS_Idle)
212 const char* tpfx =
"Attach() ";
233 TRACE(
Error, tpfx <<
"Failed opening local file, falling back to remote access " << io->
Path());
241 ((loc && loc[0] != 0) ? loc :
"<deferred open>"));
261 m_writeQ.condVar.Lock();
263 m_writeQ.queue.push_back(b);
265 m_writeQ.queue.push_front(b);
267 m_writeQ.condVar.Signal();
268 m_writeQ.condVar.UnLock();
273 std::list<Block*> removed_blocks;
274 long long sum_size = 0;
276 m_writeQ.condVar.Lock();
277 std::list<Block*>::iterator i = m_writeQ.queue.begin();
278 while (i != m_writeQ.queue.end())
280 if ((*i)->m_file == file)
282 TRACE(Dump,
"Remove entries for " << (
void*)(*i) <<
" path " << file->
lPath());
283 std::list<Block*>::iterator j = i++;
284 removed_blocks.push_back(*j);
285 sum_size += (*j)->get_size();
286 m_writeQ.queue.erase(j);
294 m_writeQ.condVar.UnLock();
298 m_RAM_write_queue -= sum_size;
310 m_writeQ.condVar.Lock();
311 while (m_writeQ.size == 0)
313 m_writeQ.condVar.Wait();
319 int n_pushed = std::min(m_writeQ.size, m_configuration.
m_wqueue_blocks);
320 long long sum_size = 0;
322 for (
int bi = 0; bi < n_pushed; ++bi)
324 Block* block = m_writeQ.queue.front();
325 m_writeQ.queue.pop_front();
326 m_writeQ.writes_between_purges += block->
get_size();
329 blks_to_write[bi] = block;
331 TRACE(Dump,
"ProcessWriteTasks for block " << (
void*)(block) <<
" path " << block->
m_file->
lPath());
333 m_writeQ.size -= n_pushed;
335 m_writeQ.condVar.UnLock();
339 m_RAM_write_queue -= sum_size;
342 for (
int bi = 0; bi < n_pushed; ++bi)
344 Block* block = blks_to_write[bi];
355 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
361 long long total = m_RAM_used + size;
366 if (std_size && m_RAM_std_size > 0)
368 char *buf = m_RAM_std_blocks.back();
369 m_RAM_std_blocks.pop_back();
380 if (posix_memalign((
void**) &buf, s_block_align, (
size_t) size))
403 m_RAM_std_blocks.push_back(buf);
415 TRACE(
Debug,
"GetFile " << path <<
", io " << io);
424 it = m_active.find(path);
428 if (it == m_active.end())
430 it = m_active.insert(std::make_pair(path, (
File*) 0)).first;
436 it->second->AddIO(io);
437 inc_ref_cnt(it->second,
false,
true);
444 m_active_cond.
Wait();
452 int res = io->
Fstat(st);
455 TRACE(
Error,
"GetFile, could not get valid stat");
456 }
else if (res > 0) {
458 TRACE(
Error,
"GetFile, stat returned positive value, this should NOT happen here");
460 filesize = st.st_size;
476 inc_ref_cnt(file,
false,
true);
503 dec_ref_cnt(f,
true);
510 class DiskSyncer :
public XrdJob
517 DiskSyncer(
File *f,
bool high_debug,
const char *desc =
"") :
520 m_high_debug(high_debug)
532 class CommandExecutor :
public XrdJob
535 std::string m_command_url;
538 CommandExecutor(
const std::string& command,
const char *desc =
"") :
540 m_command_url(command)
554 void Cache::schedule_file_sync(
File* f,
bool ref_cnt_already_set,
bool high_debug)
556 DiskSyncer* ds =
new DiskSyncer(f, high_debug);
558 if ( ! ref_cnt_already_set) inc_ref_cnt(f,
true, high_debug);
565 dec_ref_cnt(f, high_debug);
568 void Cache::inc_ref_cnt(
File* f,
bool lock,
bool high_debug)
574 if (lock) m_active_cond.
Lock();
576 if (lock) m_active_cond.
UnLock();
581 void Cache::dec_ref_cnt(
File* f,
bool high_debug)
601 <<
" -- deleting File object without further ado");
624 schedule_file_sync(f,
true,
true);
633 TRACE_INT(tlvl,
"dec_ref_cnt " << f->
GetLocalPath() <<
", cnt after sync_check and dec_ref_cnt = " << cnt);
647 int len = snprintf(buf, 4096,
"{\"event\":\"file_close\","
648 "\"lfn\":\"%s\",\"size\":%lld,\"blk_size\":%d,\"n_blks\":%d,\"n_blks_done\":%d,"
649 "\"access_cnt\":%lu,\"attach_t\":%lld,\"detach_t\":%lld,\"remotes\":%s,"
650 "\"b_hit\":%lld,\"b_miss\":%lld,\"b_bypass\":%lld,\"n_cks_errs\":%d}",
660 suc = m_gstream->
Insert(buf, len + 1);
664 TRACE(
Error,
"Failed g-stream insertion of file_close record, len=" << len);
677 return m_active.find(path) != m_active.end() ||
678 m_purge_delay_set.find(path) != m_purge_delay_set.end();
690 if ( ! m_prefetch_enabled)
695 m_prefetch_condVar.
Lock();
696 m_prefetchList.push_back(file);
697 m_prefetch_condVar.
Signal();
698 m_prefetch_condVar.
UnLock();
706 if ( ! m_prefetch_enabled)
711 m_prefetch_condVar.
Lock();
712 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
716 m_prefetchList.erase(it);
720 m_prefetch_condVar.
UnLock();
726 m_prefetch_condVar.
Lock();
727 while (m_prefetchList.empty())
729 m_prefetch_condVar.
Wait();
734 size_t l = m_prefetchList.size();
735 int idx = rand() % l;
736 File* f = m_prefetchList[idx];
738 m_prefetch_condVar.
UnLock();
750 bool doPrefetch = (m_RAM_used < limit_RAM);
788 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
789 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
790 static const char *lfpReason[] = {
"ForAccess",
"ForInfo",
"ForPath" };
792 TRACE(
Debug,
"LocalFilePath '" << curl <<
"', why=" << lfpReason[why]);
794 if (buff && blen > 0) buff[0] = 0;
797 std::string f_name = url.
GetPath();
802 int ret = m_oss->
Lfn2Pfn(f_name.c_str(), buff, blen);
803 TRACE(
Info,
"LocalFilePath '" << curl <<
"', why=" << lfpReason[why] <<
" -> " << ret);
809 m_purge_delay_set.insert(f_name);
812 struct stat sbuff, sbuff2;
816 if (S_ISDIR(sbuff.st_mode))
818 TRACE(
Info,
"LocalFilePath '" << curl <<
"', why=" << lfpReason[why] <<
" -> EISDIR");
823 bool read_ok =
false;
824 bool is_complete =
false;
836 m_active_cond.
Lock();
838 bool is_active = m_active.find(f_name) != m_active.end();
840 if (is_active) m_active_cond.
UnLock();
844 int res = infoFile->
Open(i_name.c_str(), O_RDWR, 0600, myEnv);
847 Info info(m_trace, 0);
848 if (info.
Read(infoFile, i_name.c_str()))
855 if ( ! is_active && is_complete && why ==
ForAccess)
858 info.
Write(infoFile, i_name.c_str());
865 if ( ! is_active) m_active_cond.
UnLock();
869 if ((is_complete || why ==
ForInfo) && buff != 0)
871 int res2 = m_oss->
Lfn2Pfn(f_name.c_str(), buff, blen);
878 {mode_t mode = (forall ? worldReadable : groupReadable);
879 if (((sbuff.st_mode & worldReadable) != mode)
881 {is_complete =
false;
887 TRACE(
Info,
"LocalFilePath '" << curl <<
"', why=" << lfpReason[why] <<
888 (is_complete ?
" -> FILE_COMPLETE_IN_CACHE" :
" -> EREMOTE"));
890 return is_complete ? 0 : -EREMOTE;
895 TRACE(
Info,
"LocalFilePath '" << curl <<
"', why=" << lfpReason[why] <<
" -> ENOENT");
915 TRACE(
Debug,
"ConsiderFileCached '" << curl <<
"'" );
918 std::string f_name = url.
GetPath();
923 m_purge_delay_set.insert(f_name);
926 struct stat sbuff, sbuff2;
930 if (S_ISDIR(sbuff.st_mode))
932 TRACE(
Info,
"ConsiderCached '" << curl <<
", why=ForInfo" <<
" -> EISDIR");
937 bool read_ok =
false;
938 bool is_cached =
false;
950 m_active_cond.
Lock();
952 bool is_active = m_active.find(f_name) != m_active.end();
959 int res = infoFile->
Open(i_name.c_str(), O_RDWR, 0600, myEnv);
962 Info info(m_trace, 0);
963 if (info.
Read(infoFile, i_name.c_str()))
997 if (!is_active) m_active_cond.
UnLock();
1001 TRACE(
Info,
"ConsiderCached '" << curl <<
"', why=ForInfo" << (is_cached ?
" -> FILE_COMPLETE_IN_CACHE" :
" -> EREMOTE"));
1002 return is_cached ? 0 : -EREMOTE;
1007 TRACE(
Info,
"ConsiderCached '" << curl <<
"', why=ForInfo" <<
" -> ENOENT");
1024 std::string f_name = url.
GetPath();
1028 if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1030 TRACE(
Warning,
"Prepare write access requested on file " << f_name <<
". Denying access.");
1039 CommandExecutor *ce =
new CommandExecutor(f_name,
"CommandExecutor");
1049 m_purge_delay_set.insert(f_name);
1053 int res = m_oss->
Stat(i_name.c_str(), &sbuff);
1056 TRACE(Dump,
"Prepare defer open " << f_name);
1076 std::string f_name = url.
GetPath();
1080 m_purge_delay_set.insert(f_name);
1085 if (S_ISDIR(sbuff.st_mode))
1091 bool success =
false;
1096 int res = infoFile->
Open(f_name.c_str(), O_RDONLY, 0600, myEnv);
1099 Info info(m_trace, 0);
1100 if (info.
Read(infoFile, f_name.c_str()))
1108 return success ? 0 : 1;
1125 std::string f_name = url.
GetPath();
1139 it = m_active.find(f_name);
1141 if (it != m_active.end())
1145 TRACE(
Info,
"UnlinkCommon " << f_name <<
", file currently open and force not requested - denying request");
1151 if (it->second == 0)
1153 TRACE(
Info,
"UnlinkCommon " << f_name <<
", an operation on this file is ongoing - denying request");
1163 it = m_active.insert(std::make_pair(f_name, (
File*) 0)).first;
1175 int f_ret = m_oss->
Unlink(f_name.c_str());
1176 int i_ret = m_oss->
Unlink(i_name.c_str());
1178 TRACE(
Debug,
"UnlinkCommon " << f_name <<
", f_ret=" << f_ret <<
", i_ret=" << i_ret);
1186 return std::min(f_ret, i_ret);
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
#define TRACE_PC(act, pre_code, x)
#define TRACE_INT(act, x)
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
void * PrefetchThread(void *)
void * PurgeThread(void *)
void * ResourceMonitorHeartBeatThread(void *)
void * ProcessWriteTaskThread(void *)
int stat(const char *path, struct stat *buf)
const std::string & GetPath() const
Get the path.
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
virtual const char * Path()=0
virtual int Fstat(struct stat &sbuff)
virtual const char * Location(bool refresh=false)
void * GetPtr(const char *varname)
void PutPtr(const char *varname, void *value)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
void FileSyncDone(File *, bool high_debug)
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
static const Configuration & Conf()
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
virtual int Stat(const char *url, struct stat &sbuff)
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
void Purge()
Thread function invoked to scan and purge files from disk when needed.
void ReleaseRAM(char *buf, long long size)
virtual int ConsiderCached(const char *url)
static Cache & GetInstance()
Singleton access.
void ResourceMonitorHeartBeat()
Thread function checking resource usage periodically.
void DeRegisterPrefetchFile(File *)
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
void ReleaseFile(File *, IO *)
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *ioP, int opts=0)=0
Obtain a new IO object that fronts existing XrdOucCacheIO.
static XrdScheduler * schedP
bool IsFileActiveOrPurgeProtected(const std::string &)
File * GetNextFileToPrefetch()
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
virtual int Unlink(const char *url)
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
static const Cache & TheOne()
char * RequestRAM(long long size)
virtual int Prepare(const char *url, int oflags, mode_t mode)
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
std::string GetRemoteLocations() const
const Info::AStat * GetLastAccessStats() const
size_t GetAccessCnt() const
int GetNDownloadedBlocks() const
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
void initiate_emergency_shutdown()
void Sync()
Sync file cache inf o and output data with disk.
const Stats & RefStats() const
std::string & GetLocalPath()
Stats DeltaStatsFromLastCall()
bool is_in_emergency_shutdown()
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
bool HasFile() const
Check if File was opened successfully.
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Status of cached file. Can be read from and written into a binary file.
static const char * s_infoExtension
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
long long GetNDownloadedBytes() const
Get number of downloaded bytes.
bool IsComplete() const
Get complete status.
long long GetFileSize() const
Get file size.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Statistics of cache utilisation by a File object.
int m_NCksumErrors
number of checksum errors while getting data from remote
void Schedule(XrdJob *jp)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
bool Insert(const char *data, int dlen)
Contains parameters configurable from the xrootd config file.
long long m_RamAbsAvailable
available from configuration
long long m_fileUsageMax
cache purge - files usage maximum
long long m_fileUsageBaseline
cache purge - files usage baseline
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
long long m_diskUsageHWM
cache purge - disk usage high water mark
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu)
long long m_diskUsageLWM
cache purge - disk usage low water mark
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
long long m_bufferSize
prefetch buffer size, default 1MB
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
std::string m_username
username passed to oss plugin
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
long long BytesHit
read from cache
long long BytesBypassed
read from remote and dropped
time_t DetachTime
close time
long long BytesMissed
read from remote and cached
time_t AttachTime
open time