43 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
49 const char *File::m_traceID =
"File";
53 File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
57 m_cfi(
Cache::GetInstance().
GetTrace(),
Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
65 m_detach_time_logged(false),
70 m_prefetch_state(kOff),
71 m_prefetch_read_cnt(0),
72 m_prefetch_hit_cnt(0),
94 TRACEF(
Debug,
"~File() ended, prefetch score = " << m_prefetch_score);
101 File *file =
new File(path, offset, fileSize);
129 m_in_shutdown =
true;
131 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
133 m_prefetch_state = kStopped;
134 cache()->DeRegisterPrefetchFile(
this);
146 Stats delta = m_last_stats;
148 m_last_stats = m_stats.
Clone();
159 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
167 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
171 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
183 insert_remote_location(loc);
199 IoSet_i mi = m_io_set.find(io);
201 if (mi != m_io_set.end())
206 ", active_reads " << n_active_reads <<
207 ", active_prefetches " << io->m_active_prefetches <<
208 ", allow_prefetching " << io->m_allow_prefetching <<
209 ", ios_in_detach " << m_ios_in_detach);
211 "\tio_map.size() " << m_io_set.size() <<
212 ", block_map.size() " << m_block_map.size() <<
", file");
214 insert_remote_location(loc);
216 io->m_allow_prefetching =
false;
217 io->m_in_detach =
true;
220 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
222 if ( ! select_current_io_or_disable_prefetching(
false) )
224 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
231 bool io_active_result;
233 if (n_active_reads > 0)
235 io_active_result =
true;
237 else if (m_io_set.size() - m_ios_in_detach == 1)
239 io_active_result = ! m_block_map.empty();
243 io_active_result = io->m_active_prefetches > 0;
246 if ( ! io_active_result)
251 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
253 return io_active_result;
257 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
268 m_detach_time_logged =
false;
277 if ( ! m_in_shutdown)
279 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
283 m_detach_time_logged =
true;
285 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
289 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
301 time_t now = time(0);
306 IoSet_i mi = m_io_set.find(io);
308 if (mi == m_io_set.end())
311 io->m_attach_time = now;
314 insert_remote_location(loc);
316 if (m_prefetch_state == kStopped)
318 m_prefetch_state = kOn;
319 cache()->RegisterPrefetchFile(
this);
324 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
338 time_t now = time(0);
342 IoSet_i mi = m_io_set.find(io);
344 if (mi != m_io_set.end())
346 if (mi == m_current_io)
351 m_stats.
IoDetach(now - io->m_attach_time);
355 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
357 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
358 m_prefetch_state = kStopped;
359 cache()->DeRegisterPrefetchFile(
this);
364 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
376 static const char *tpfx =
"Open() ";
378 TRACEF(Dump, tpfx <<
"open file for disk cache");
385 struct stat data_stat, info_stat;
389 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
390 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
393 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
394 myEnv.
Put(
"oss.asize", size_str);
406 m_data_file = myOss.
newFile(myUser);
407 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
411 delete m_data_file; m_data_file = 0;
415 myEnv.
Put(
"oss.asize",
"64k");
421 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
425 m_info_file = myOss.
newFile(myUser);
426 if ((res = m_info_file->
Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
430 delete m_info_file; m_info_file = 0;
431 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
435 bool initialize_info_file =
true;
437 if (info_existed && m_cfi.
Read(m_info_file, ifn.c_str()))
439 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
440 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
446 initialize_info_file =
false;
448 TRACEF(
Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
459 TRACEF(
Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
460 initialize_info_file =
true;
469 if (initialize_info_file)
474 m_cfi.
Write(m_info_file, ifn.c_str());
475 m_info_file->
Fsync();
476 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
" num blocks = " << m_cfi.
GetNBlocks());
483 m_prefetch_state = (m_cfi.
IsComplete()) ? kComplete : kStopped;
494 bool File::overlap(
int blk,
503 const long long beg = blk * blk_size;
504 const long long end = beg + blk_size;
505 const long long req_end = req_off + req_size;
507 if (req_off < end && req_end > beg)
509 const long long ovlp_beg = std::max(beg, req_off);
510 const long long ovlp_end = std::min(end, req_end);
512 off = ovlp_beg - req_off;
513 blk_off = ovlp_beg - beg;
514 size = (int) (ovlp_end - ovlp_beg);
516 assert(size <= blk_size);
527 Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
535 const long long off = i * m_block_size;
536 const int last_block = m_num_blocks - 1;
537 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
539 int blk_size, req_size;
540 if (i == last_block) {
541 blk_size = req_size = m_file_size - off;
542 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
544 blk_size = req_size = m_block_size;
548 char *buf = cache()->RequestRAM(req_size);
552 b =
new (std::nothrow)
Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
562 m_prefetch_state = kHold;
563 cache()->DeRegisterPrefetchFile(
this);
568 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
575 void File::ProcessBlockRequest(
Block *b)
583 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
585 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
601 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
603 ProcessBlockRequest(*bi);
609 void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
611 int n_chunks = ioVec.size();
614 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
615 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
625 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
630 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
632 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
634 long long rs = m_data_file->
ReadV(ioVec.data(), (
int) ioVec.size());
638 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
642 if (rs != expected_size)
644 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
663 if (m_in_shutdown || io->m_in_detach)
666 return m_in_shutdown ? -ENOENT : -EBADF;
674 int ret = m_data_file->
Read(iUserBuff, iUserOff, iUserSize);
679 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
681 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
688 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
692 if (m_in_shutdown || io->m_in_detach)
695 return m_in_shutdown ? -ENOENT : -EBADF;
708 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
713 int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
725 int prefetch_cnt = 0;
730 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
732 std::vector<XrdOucIOVec> iovec_disk;
733 std::vector<XrdOucIOVec> iovec_direct;
734 int iovec_disk_total = 0;
735 int iovec_direct_total = 0;
737 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
744 const int idx_first = iUserOff / m_block_size;
745 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
747 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
749 enum LastBlock_e { LB_other, LB_disk, LB_direct };
751 LastBlock_e lbe = LB_other;
753 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
756 BlockMap_i bi = m_block_map.find(block_idx);
763 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
766 if (bi != m_block_map.end())
768 inc_ref_count(bi->second);
769 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
771 if (bi->second->is_finished())
775 assert(bi->second->is_ok());
777 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
779 if (bi->second->m_prefetch)
790 bi->second->m_chunk_reqs.emplace_back(
ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
799 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
802 iovec_disk.back().size += size;
804 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
805 iovec_disk_total += size;
819 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
822 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
824 blks_to_request.push_back(b);
833 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
835 iovec_direct_total += size;
842 iovec_direct.back().size += size;
844 long long in_offset = block_idx * m_block_size + blk_off;
845 char *out_pos = iUserBuff + off;
852 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
861 inc_prefetch_hit_cnt(prefetch_cnt);
866 if ( ! blks_to_request.empty())
868 ProcessBlockRequests(blks_to_request);
869 blks_to_request.clear();
873 if ( ! iovec_direct.empty())
875 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
877 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
882 long long bytes_read = 0;
886 if ( ! blks_ready.empty())
888 for (
auto &bvi : blks_ready)
890 for (
auto &cr : bvi.second)
892 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
893 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
894 bytes_read += cr.m_size;
900 if ( ! iovec_disk.empty())
902 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
903 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
920 for (
auto &bvi : blks_ready)
921 dec_ref_count(bvi.first, (
int) bvi.second.size());
954 return error_cond ? error_cond : bytes_read;
966 long long offset = b->
m_offset - m_offset;
986 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
996 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
999 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1001 bool schedule_sync =
false;
1022 m_writes_during_sync.push_back(blk_idx);
1027 ++m_non_flushed_cnt;
1031 schedule_sync =
true;
1033 m_non_flushed_cnt = 0;
1040 cache()->ScheduleFileSync(
this);
1050 int ret = m_data_file->
Fsync();
1051 bool errorp =
false;
1056 m_cfi.
Write(m_info_file, m_filename.c_str());
1057 int cret = m_info_file->
Fsync();
1060 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1066 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1072 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1079 m_writes_during_sync.clear();
1085 int written_while_in_sync;
1086 bool resync =
false;
1089 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1093 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1094 m_writes_during_sync.clear();
1098 if (written_while_in_sync > 0 && m_cfi.
IsComplete() && ! m_in_shutdown)
1103 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1114 void File::free_block(
Block* b)
1117 int i = b->
m_offset / m_block_size;
1118 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1119 size_t ret = m_block_map.erase(i);
1123 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1133 m_prefetch_state = kOn;
1134 cache()->RegisterPrefetchFile(
this);
1140 bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1144 int io_size = (int) m_io_set.size();
1149 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1152 m_current_io = m_io_set.begin();
1155 else if (io_size > 1)
1157 IoSet_i mi = m_current_io;
1158 if (skip_current && mi != m_io_set.end()) ++mi;
1160 for (
int i = 0; i < io_size; ++i)
1162 if (mi == m_io_set.end()) mi = m_io_set.begin();
1164 if ((*mi)->m_allow_prefetching)
1176 m_current_io = m_io_set.end();
1177 m_prefetch_state = kStopped;
1178 cache()->DeRegisterPrefetchFile(
this);
1186 void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1192 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1194 m_state_cond.
Lock();
1210 FinalizeReadRequest(rreq);
1237 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1240 m_state_cond.
Lock();
1245 rreq->m_stats.m_BytesMissed += creq.
m_size;
1247 rreq->m_stats.m_BytesHit += creq.
m_size;
1249 --rreq->m_n_chunk_reqs;
1252 inc_prefetch_hit_cnt(1);
1256 bool rreq_complete = rreq->is_complete();
1261 FinalizeReadRequest(rreq);
1275 void File::ProcessBlockResponse(
Block *b,
int res)
1277 static const char* tpfx =
"ProcessBlockResponse ";
1279 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1281 if (res >= 0 && res != b->
get_size())
1285 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1289 m_state_cond.
Lock();
1295 IoSet_i mi = m_io_set.find(io);
1296 if (mi != m_io_set.end())
1298 --io->m_active_prefetches;
1301 if (res < 0 && io->m_allow_prefetching)
1303 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1304 io->m_allow_prefetching =
false;
1307 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1309 if ( ! select_current_io_or_disable_prefetching(
false) )
1311 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1317 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1333 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1334 if ( ! m_in_shutdown)
1339 cache()->AddWriteTask(b,
true);
1348 for (
auto &creq : creqs_to_notify)
1350 ProcessBlockSuccess(b, creq);
1359 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1370 std::list<ReadRequest*> rreqs_to_complete;
1379 ProcessBlockError(b, rreq);
1382 rreqs_to_complete.push_back(rreq);
1387 creqs_to_keep.push_back(creq);
1391 bool reissue =
false;
1392 if ( ! creqs_to_keep.empty())
1394 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1396 TRACEF(
Info,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1397 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1406 for (
auto rreq : rreqs_to_complete)
1407 FinalizeReadRequest(rreq);
1410 ProcessBlockRequest(b);
1418 return m_filename.c_str();
1423 int File::offsetIdx(
int iIdx)
const
1425 return iIdx - m_offset/m_block_size;
1439 TRACEF(DumpXL,
"Prefetch() entering.");
1443 if (m_prefetch_state != kOn)
1448 if ( ! select_current_io_or_disable_prefetching(
true) )
1450 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1455 for (
int f = 0; f < m_num_blocks; ++f)
1459 int f_act = f + m_offset / m_block_size;
1461 BlockMap_i bi = m_block_map.find(f_act);
1462 if (bi == m_block_map.end())
1464 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1467 TRACEF(Dump,
"Prefetch take block " << f_act);
1471 inc_prefetch_read_cnt(1);
1476 TRACEF(
Warning,
"Prefetch allocation failed for block " << f_act);
1485 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1486 m_prefetch_state = kComplete;
1487 cache()->DeRegisterPrefetchFile(
this);
1491 (*m_current_io)->m_active_prefetches += (int) blks.size();
1495 if ( ! blks.empty())
1497 ProcessBlockRequests(blks);
1506 return m_prefetch_score;
1519 void File::insert_remote_location(
const std::string &loc)
1523 size_t p = loc.find_first_of(
'@');
1524 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1531 if ( ! m_remote_locations.empty())
1535 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1539 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1542 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1544 s +=
'"'; s += *i; s +=
'"';
1545 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
int stat(const char *path, struct stat *buf)
const char * XrdSysE2T(int errcode)
virtual int Ftruncate(unsigned long long flen)
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual ssize_t Read(off_t offset, size_t size)
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
void * get_req_id() const
long long get_offset() const
vChunkRequest_t m_chunk_reqs
vCkSum_t & ref_cksum_vec()
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static Cache & GetInstance()
Singleton access.
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void initiate_emergency_shutdown()
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
std::string & GetLocalPath()
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Stats DeltaStatsFromLastCall()
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
XrdOucCacheIO * GetInput()
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
static const char * s_infoExtension
void SetBitSynced(int i)
Mark block as synced to disk.
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void DowngradeCkSumState(CkSumCheck_e css_ref)
void ResetAllAccessStats()
Reset IO Stats.
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
bool IsComplete() const
Get complete status.
bool IsCkSumCache() const
void SetBitWritten(int i)
Mark block as written to disk.
long long GetBufferSize() const
Get prefetch buffer size.
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
long long GetExpectedDataFileSize() const
Get expected data file size.
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetCkSumState(CkSumCheck_e css)
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Statistics of cache utilisation by a File object.
void AddReadStats(const Stats &s)
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddWriteStats(long long bytes_written, int n_cks_errs)
void AddBytesHit(long long bh)
long long m_BytesHit
number of bytes served from disk
void IoDetach(int duration)
void DeltaToReference(const Stats &ref)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * > BlockList_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
prefetch buffer size, default 1MB
std::string m_meta_space
oss space for metadata files (cinfo)
std::string m_username
username passed to oss plugin
void update_error_cond(int ec)