29 #include <sys/types.h>
37 #include <radosstriper/libradosstriper.hpp>
42 #include <sys/xattr.h>
80 typedef std::map<std::string, libradosstriper::RadosStriper*>
StriperDict;
82 typedef std::map<std::string, librados::IoCtx*>
IOCtxDict;
103 std::map<unsigned int, CephFileRef>
g_fds;
151 std::map<unsigned int, CephFileRef>::iterator it =
g_fds.find(fd);
152 if (it !=
g_fds.end()) {
157 return &(it->second);
166 if (fr.
flags & (O_WRONLY|O_RDWR)) {
169 std::map<unsigned int, CephFileRef>::iterator it =
g_fds.find(fd);
170 if (it !=
g_fds.end()) {
183 if (fr.
flags & (O_WRONLY|O_RDWR)) {
206 va_start(arg, format);
207 (*g_logfunc)(format, arg);
212 static unsigned long long int stoull(
const std::string &s) {
215 unsigned long long int res = strtoull(s.c_str(), &end, 10);
217 throw std::invalid_argument(s);
219 if (ERANGE == errno) {
220 throw std::out_of_range(s);
226 static unsigned int stoui(
const std::string &s) {
229 unsigned long int res = strtoul(s.c_str(), &end, 10);
231 throw std::invalid_argument(s);
233 if (ERANGE == errno || res > std::numeric_limits<unsigned int>::max()) {
234 throw std::out_of_range(s);
236 return (
unsigned int)res;
248 int n_fds =
g_fds.size();
250 int n_stripers_pool = 0;
254 std::stringstream ss;
255 ss <<
"Counts: " << n_cluster <<
" " << n_ioCtx <<
" " << n_filesOpenForWrite <<
" "
256 << n_fds <<
" " << n_stripers <<
" " << n_stripers_pool <<
" " << n_stripers_pool
257 <<
" CountsbyCluster: [";
259 ss << el.first <<
":" << el.second <<
", " ;
263 logwrapper((
char*)
"dumpClusterInfo : %s", ss.str().c_str());
272 size_t atPos = params.find(
'@');
273 if (std::string::npos != atPos) {
274 file.
userId = params.substr(0, atPos);
278 char* cuser = env->
Get(
"cephUserId");
293 size_t comPos = params.find(
',', offset);
294 if (std::string::npos == comPos) {
295 if (params.size() == offset) {
297 char* cpool = env->
Get(
"cephPool");
303 file.
pool = params.substr(offset);
305 return params.size();
307 file.
pool = params.substr(offset, comPos-offset);
319 size_t comPos = params.find(
',', offset);
320 if (std::string::npos == comPos) {
321 if (params.size() == offset) {
323 char* cNbStripes = env->
Get(
"cephNbStripes");
324 if (0 != cNbStripes) {
331 return params.size();
345 size_t comPos = params.find(
',', offset);
346 if (std::string::npos == comPos) {
347 if (params.size() == offset) {
349 char* cStripeUnit = env->
Get(
"cephStripeUnit");
350 if (0 != cStripeUnit) {
357 return params.size();
371 if (params.size() == offset) {
373 char* cObjectSize = env->
Get(
"cephObjectSize");
374 if (0 != cObjectSize) {
388 unsigned int afterPool =
fillCephPool(params, afterUser, env, file);
408 char physCName[MAXPATHLEN+1];
409 int retc =
g_namelib->
lfn2pfn(logName.c_str(), physCName,
sizeof(physCName));
411 logwrapper((
char*)
"ceph_namelib : failed to translate %s using namelib plugin, using it as is", logName.c_str());
414 logwrapper((
char*)
"ceph_namelib : translated %s to %s", logName.c_str(), physCName);
415 physName = physCName;
433 std::string spath {path};
436 size_t colonPos = spath.find(
':');
437 if (std::string::npos == colonPos) {
442 file.
name = spath.substr(colonPos+1);
454 mode_t mode,
unsigned long long offset) {
480 librados::Rados *cluster =
new librados::Rados;
484 int rc = cluster->init(userId.c_str());
486 logwrapper((
char*)
"checkAndCreateCluster : cluster init failed");
490 rc = cluster->conf_read_file(NULL);
492 logwrapper((
char*)
"checkAndCreateCluster : cluster read config failed, rc = %d", rc);
497 cluster->conf_parse_env(NULL);
498 rc = cluster->connect();
500 logwrapper((
char*)
"checkAndCreateCluster : cluster connect failed, rc = %d", rc);
512 StriperDict::iterator it = sDict.find(userAtPool);
513 if (it == sDict.end()) {
518 logwrapper((
char*)
"checkAndCreateStriper : checkAndCreateCluster failed");
522 librados::IoCtx *ioctx =
new librados::IoCtx;
524 logwrapper((
char*)
"checkAndCreateStriper : IoCtx instantiation failed");
530 int rc =
g_cluster[cephPoolIdx]->ioctx_create(file.
pool.c_str(), *ioctx);
532 logwrapper((
char*)
"checkAndCreateStriper : ioctx_create failed, rc = %d", rc);
540 libradosstriper::RadosStriper *striper =
new libradosstriper::RadosStriper;
542 logwrapper((
char*)
"checkAndCreateStriper : RadosStriper instantiation failed");
549 rc = libradosstriper::RadosStriper::striper_create(*ioctx, striper);
551 logwrapper((
char*)
"checkAndCreateStriper : striper_create failed, rc = %d", rc);
560 rc = striper->set_object_layout_stripe_count(file.
nbStripes);
570 rc = striper->set_object_layout_stripe_unit(file.
stripeUnit);
572 logwrapper((
char*)
"checkAndCreateStriper : invalid stripeUnit %d (must be non 0, multiple of 64K)", file.
stripeUnit);
580 rc = striper->set_object_layout_object_size(file.
objectSize);
582 logwrapper((
char*)
"checkAndCreateStriper : invalid objectSize %d (must be non 0, multiple of stripe_unit)", file.
objectSize);
591 ioDict.emplace(userAtPool, ioctx);
592 sDict.emplace(userAtPool, striper);
599 std::stringstream ss;
602 std::string userAtPool = ss.str();
605 logwrapper((
char*)
"getRadosStriper : checkAndCreateStriper failed");
613 std::stringstream ss;
616 std::string userAtPool = ss.str();
621 return g_ioCtx[cephPoolIdx][userAtPool];
632 for (IOCtxDict::iterator it2 =
g_ioCtx[i].begin();
670 if (NULL == striper) {
676 int rc = striper->stat(fr.
name, (uint64_t*)&(buf.st_size), &(buf.st_atime));
679 bool fileExists = (rc != -ENOENT);
681 logwrapper((
char*)
"Access Mode: %s flags&O_ACCMODE %d ", pathname, flags);
683 if ((flags&O_ACCMODE) == O_RDONLY) {
686 librados::bufferlist d_stripeUnit;
687 librados::bufferlist d_objectSize;
688 std::string obj_name;
689 librados::IoCtx *context =
getIoCtx(fr);
697 obj_name = fr.
name + std::string(
".0000000000000000");
698 }
catch (std::bad_alloc&) {
699 logwrapper((
char*)
"Can not create object string for file %s)", fr.
name.c_str());
703 ret = context->getxattr(obj_name,
"striper.layout.stripe_unit", d_stripeUnit);
704 ret = std::min(ret,context->getxattr(obj_name,
"striper.layout.object_size", d_objectSize));
707 logwrapper((
char*)
"Could not find size or stripe_unit xattr for %s", fr.
name.c_str());
713 unsigned int stripeUnitLength = std::min((
unsigned int)
MAXDIGITSIZE-1, d_stripeUnit.length());
714 unsigned int objectSizeLength = std::min((
unsigned int)
MAXDIGITSIZE-1, d_objectSize.length());
715 (void)strncpy( cleanStripeUnit, d_stripeUnit.c_str(), stripeUnitLength );
716 (void)strncpy( cleanObjectSize, d_objectSize.c_str(), objectSizeLength );
717 cleanStripeUnit[stripeUnitLength] =
'\0';
718 cleanObjectSize[objectSizeLength] =
'\0';
721 logwrapper((
char*)
"WARNING: stripe unit of %s does not match defaults. object size is %s", pathname, cleanStripeUnit);
725 logwrapper((
char*)
"WARNING: object size of %s does not match defaults. object size is %s",pathname, cleanObjectSize);
730 logwrapper((
char*)
"File descriptor %d associated to file %s opened in read mode", fd, pathname);
738 if (flags & O_TRUNC) {
740 if (rc < 0 && rc != -ENOENT) {
744 if (flags & O_EXCL) {
753 logwrapper((
char*)
"File descriptor %d associated to file %s opened in write mode", fd, pathname);
764 ::gettimeofday(&now,
nullptr);
766 double lastAsyncAge = 0.0;
772 logwrapper((
char*)
"ceph_close: closed fd %d for file %s, read ops count %d, write ops count %d, "
773 "async write ops %d/%d, async pending write bytes %ld, "
774 "async read ops %d/%d, bytes written/max offset %ld/%ld, "
775 "longest async write %f, longest callback invocation %f, last async op age %f",
804 logwrapper((
char*)
"ceph_lseek: for fd %d, offset=%lld, whence=%d", fd, offset, whence);
814 logwrapper((
char*)
"ceph_lseek64: for fd %d, offset=%lld, whence=%d", fd, offset, whence);
824 logwrapper((
char*)
"ceph_write: for fd %d, count=%d", fd, count);
825 if ((fr->
flags & (O_WRONLY|O_RDWR)) == 0) {
833 bl.append((
const char*)buf, count);
834 int rc = striper->write(fr->
name, bl, count, fr->
offset);
852 if ((fr->
flags & (O_WRONLY|O_RDWR)) == 0) {
860 bl.append((
const char*)buf, count);
861 int rc = striper->write(fr->
name, bl, count, offset);
875 size_t rc = rados_aio_get_return_value(c);
887 ::gettimeofday(&now,
nullptr);
888 double writeTime = 0.000001 * (now.tv_usec - awa->
startTime.tv_usec) + 1.0 * (now.tv_sec - awa->
startTime.tv_sec);
891 ::timeval before, after;
892 if (fr) ::gettimeofday(&before,
nullptr);
895 ::gettimeofday(&after,
nullptr);
896 double callbackInvocationTime = 0.000001 * (after.tv_usec - before.tv_usec) + 1.0 * (after.tv_sec - before.tv_sec);
912 if ((fr->
flags & (O_WRONLY|O_RDWR)) == 0) {
922 bl.append(buf, count);
932 librados::AioCompletion *completion =
935 int rc = striper->aio_write(fr->
name, completion, bl, count, offset);
936 completion->release();
952 if ((fr->
flags & O_WRONLY) != 0) {
963 librados::IoCtx *ioctx =
getIoCtx(*fr);
972 for (
int i = 0; i < n; i++) {
973 rc = readOp.
read(readV[i].data, readV[i].size, readV[i].offset);
975 logwrapper( (
char*)
"Can not declare read request\n");
980 std::time_t wait_time = std::time(0);
982 wait_time = std::time(0) - wait_time;
985 (
char*)
"Waiting for AIO results in readv for %s took %ld seconds, too long!\n",
991 logwrapper( (
char*)
"Can not submit read requests\n");
999 }
catch(std::bad_alloc&) {
1011 ssize_t nbytes = 0, curCount = 0;
1012 for (
int i=0; i<n; i++) {
1013 curCount =
ceph_posix_pread(fd, (
void *)readV[i].data, (
size_t)readV[i].size, (off_t)readV[i].offset);
1014 if (curCount != readV[i].size) {
1015 if (curCount < 0)
return curCount;
1028 if ((fr->
flags & O_WRONLY) != 0) {
1035 ceph::bufferlist bl;
1036 int rc = striper->read(fr->
name, &bl, count, fr->
offset);
1037 if (rc < 0)
return rc;
1038 bl.begin().copy(rc, (
char*)buf);
1055 if ((fr->
flags & O_WRONLY) != 0) {
1066 librados::IoCtx *ioctx =
getIoCtx(*fr);
1074 rc = readOp.
read(buf, count, offset);
1076 logwrapper( (
char*)
"Can not declare read request\n");
1079 std::time_t wait_time = std::time(0);
1081 wait_time = std::time(0) - wait_time;
1084 (
char*)
"Waiting for AIO results in pread for %s took %ld seconds, too long!\n",
1090 logwrapper( (
char*)
"Can not submit read request\n");
1095 if (bytes_read > 0) {
1099 logwrapper( (
char*)
"Error while read: %d\n", bytes_read);
1102 }
catch (std::bad_alloc&) {
1115 if ((fr->
flags & O_WRONLY) != 0) {
1122 ceph::bufferlist bl;
1123 int rc = striper->read(fr->
name, &bl, count, offset);
1124 if (rc < 0)
return rc;
1125 bl.begin().copy(rc, (
char*)buf);
1136 if (!allowStriper) {
1141 if (-ENOENT == rc || -ENOTSUP == rc) {
1146 snprintf(err_str, 100,
"WARNING! The file (fd %d) seem to be sparse, this is not expected", fd);
1156 size_t rc = rados_aio_get_return_value(c);
1183 if ((fr->
flags & O_WRONLY) != 0) {
1192 ceph::bufferlist *bl =
new ceph::bufferlist();
1202 librados::AioCompletion *completion =
1205 int rc = striper->aio_read(fr->
name, completion, bl, count, offset);
1206 completion->release();
1218 logwrapper((
char*)__FUNCTION__,
": fd %d", fd);
1224 logwrapper((
char*)
"ceph_stat: getRadosStriper failed");
1227 memset(buf, 0,
sizeof(*buf));
1228 int rc = striper->stat(fr->
name, (uint64_t*)&(buf->st_size), &(buf->st_atime));
1234 buf->st_mtime = buf->st_atime;
1235 buf->st_ctime = buf->st_atime;
1236 buf->st_mode = 0666 | S_IFREG;
1253 memset(buf, 0,
sizeof(*buf));
1254 int rc = striper->stat(file.
name, (uint64_t*)&(buf->st_size), &(buf->st_atime));
1260 buf->st_atime = time(NULL);
1269 buf->st_mtime = buf->st_atime;
1270 buf->st_ctime = buf->st_atime;
1271 buf->st_mode = 0666 | S_IFREG;
1289 logwrapper((
char*)
"ceph_fcntl: fd %d cmd=%d", fd, cmd);
1303 void* value,
size_t size) {
1308 ceph::bufferlist bl;
1309 int rc = striper->getxattr(file.
name, name, bl);
1310 if (rc < 0)
return rc;
1311 size_t returned_size = (size_t)rc<size?rc:size;
1312 bl.begin().copy(returned_size, (
char*)value);
1313 return returned_size;
1317 const char* name,
void* value,
1319 logwrapper((
char*)
"ceph_getxattr: path %s name=%s", path, name);
1324 void* value,
size_t size) {
1327 logwrapper((
char*)
"ceph_fgetxattr: fd %d name=%s", fd, name);
1335 const void* value,
size_t size,
int flags) {
1340 ceph::bufferlist bl;
1341 bl.append((
const char*)value, size);
1342 int rc = striper->setxattr(file.
name, name, bl);
1350 const char* name,
const void* value,
1351 size_t size,
int flags) {
1352 logwrapper((
char*)
"ceph_setxattr: path %s name=%s value=%s", path, name, value);
1357 const char* name,
const void* value,
1358 size_t size,
int flags) {
1361 logwrapper((
char*)
"ceph_fsetxattr: fd %d name=%s value=%s", fd, name, value);
1373 int rc = striper->rmxattr(file.
name, name);
1382 logwrapper((
char*)
"ceph_removexattr: path %s name=%s", path, name);
1389 logwrapper((
char*)
"ceph_fremovexattr: fd %d name=%s", fd, name);
1402 std::map<std::string, ceph::bufferlist> attrset;
1403 int rc = striper->getxattrs(file.
name, attrset);
1410 for (std::map<std::string, ceph::bufferlist>::const_iterator it = attrset.begin();
1411 it != attrset.end();
1414 newItem->
Next = *aPL;
1415 newItem->
Vlen = it->second.length();
1416 if (newItem->
Vlen > maxSize) {
1417 maxSize = newItem->
Vlen;
1419 newItem->
Nlen = it->first.size();
1420 strncpy(newItem->
Name, it->first.c_str(), newItem->
Vlen+1);
1431 logwrapper((
char*)
"ceph_listxattrs: path %s", path);
1438 logwrapper((
char*)
"ceph_flistxattrs: fd %d", fd);
1464 librados::cluster_stat_t result;
1465 int rc = cluster->cluster_stat(result);
1467 *totalSpace = result.kb * 1024;
1468 *freeSpace = result.kb_avail * 1024;
1501 std::list<std::string> poolNames({poolName});
1502 std::map<std::string, librados::pool_stat_t>
stat;
1504 if (cluster->get_pool_stats(poolNames,
stat) < 0) {
1506 logwrapper((
char*)
"Unable to get_pool_stats for pool ", poolName);
1511 *usedSpace =
stat[poolName].num_kb * 1024;
1522 return striper->trunc(file.
name, size);
1528 logwrapper((
char*)
"ceph_posix_ftruncate: fd %d, size %d", fd, size);
1536 logwrapper((
char*)
"ceph_posix_truncate : %s", pathname);
1543 logwrapper((
char*)
"ceph_posix_unlink : %s", pathname);
1545 auto timer_start = std::chrono::steady_clock::now();
1553 int rc = striper->remove(file.
name);
1554 auto end = std::chrono::steady_clock::now();
1555 auto deltime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - timer_start).count();
1558 logwrapper((
char*)
"ceph_posix_unlink : %s unlink successful: %d ms", pathname, deltime_ms);
1562 logwrapper((
char*)
"ceph_posix_unlink : %s unlink failed: %d ms; return code %d", pathname, deltime_ms, rc);
1566 logwrapper((
char*)
"ceph_posix_unlink : unlink failed with -EBUSY %s, now trying to remove lock.", pathname);
1571 logwrapper((
char*)
"ceph_posix_unlink : unlink rmxattr failed %s, %d", pathname, rc);
1576 rc = striper->remove(file.
name);
1577 end = std::chrono::steady_clock::now();
1578 deltime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - timer_start).count();
1581 logwrapper((
char*)
"ceph_posix_unlink : unlink failed after lock removal %s, %d ms", pathname, deltime_ms);
1583 logwrapper((
char*)
"ceph_posix_unlink : unlink suceeded after lock removal %s, %d ms", pathname, deltime_ms);
1589 logwrapper((
char*)
"ceph_posix_opendir : %s", pathname);
1592 if (file.
name.size() != 1 || file.
name[0] !=
'/') {
1596 librados::IoCtx *ioctx =
getIoCtx(file);
1608 librados::NObjectIterator &iterator = ((
DirIterator*)dirp)->m_iterator;
1609 librados::IoCtx *ioctx = ((
DirIterator*)dirp)->m_ioctx;
1610 while (iterator->get_oid().compare(iterator->get_oid().size()-17, 17,
".0000000000000000") &&
1611 iterator != ioctx->nobjects_end()) {
1614 if (iterator == ioctx->nobjects_end()) {
1617 int l = iterator->get_oid().size()-17;
1618 if (l < blen) blen = l;
1619 strncpy(buff, iterator->get_oid().c_str(), blen-1);
unsigned int g_cephPoolIdx
index of current Striper/IoCtx to be used
ssize_t ceph_posix_maybestriper_pread(int fd, void *buf, size_t count, off64_t offset, bool allowStriper)
ssize_t ceph_posix_write(int fd, const void *buf, size_t count)
unsigned int getCephPoolIdxAndIncrease()
void ceph_posix_set_logfunc(void(*logfunc)(char *, va_list argp))
int ceph_posix_truncate(XrdOucEnv *env, const char *pathname, unsigned long long size)
CephFile g_defaultParams
global variable containing defaults for CephFiles
ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb)
int ceph_posix_unlink(XrdOucEnv *env, const char *pathname)
XrdOucName2Name * g_namelib
void fillCephFileParams(const std::string ¶ms, XrdOucEnv *env, CephFile &file)
ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb)
int ceph_posix_readdir(DIR *dirp, char *buff, int blen)
std::multiset< std::string > g_filesOpenForWrite
global variable holding a list of files currently opened for write
int ceph_posix_fcntl(int fd, int cmd,...)
static ssize_t ceph_posix_internal_setxattr(const CephFile &file, const char *name, const void *value, size_t size, int flags)
ssize_t ceph_posix_getxattr(XrdOucEnv *env, const char *path, const char *name, void *value, size_t size)
DIR * ceph_posix_opendir(XrdOucEnv *env, const char *pathname)
std::vector< librados::Rados * > g_cluster
static int fillCephStripeUnit(const std::string ¶ms, unsigned int offset, XrdOucEnv *env, CephFile &file)
static void ceph_aio_write_complete(rados_completion_t c, void *arg)
static CephFile getCephFile(const char *path, XrdOucEnv *env)
ssize_t ceph_nonstriper_readv(int fd, XrdOucIOVec *readV, int n)
XrdSysMutex g_striper_mutex
mutex protecting the striper and ioctx maps
std::map< std::string, libradosstriper::RadosStriper * > StriperDict
ssize_t ceph_posix_read(int fd, void *buf, size_t count)
static int fillCephPool(const std::string ¶ms, unsigned int offset, XrdOucEnv *env, CephFile &file)
ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset)
int ceph_posix_listxattrs(XrdOucEnv *env, const char *path, XrdSysXAttr::AList **aPL, int getSz)
int ceph_posix_fstat(int fd, struct stat *buf)
static unsigned int stoui(const std::string &s)
simple integer parsing, to be replaced by std::stoi when C++11 can be used
void ceph_posix_disconnect_all()
int ceph_posix_fsync(int fd)
XrdSysMutex g_init_mutex
mutex protecting initialization of ceph clusters
int ceph_posix_closedir(DIR *dirp)
static int fillCephNbStripes(const std::string ¶ms, unsigned int offset, XrdOucEnv *env, CephFile &file)
std::map< unsigned int, CephFileRef > g_fds
global variable holding a map of file descriptor to file reference
unsigned int g_nextCephFd
global variable remembering the next available file descriptor
static ssize_t ceph_posix_internal_getxattr(const CephFile &file, const char *name, void *value, size_t size)
std::vector< IOCtxDict > g_ioCtx
int insertFileRef(CephFileRef &fr)
librados::Rados * checkAndCreateCluster(unsigned int cephPoolIdx, std::string userId=g_defaultParams.userId)
static libradosstriper::RadosStriper * getRadosStriper(const CephFile &file)
int ceph_posix_statfs(long long *totalSpace, long long *freeSpace)
int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, const CephFile &file)
unsigned int g_maxCephPoolIdx
static void fillCephObjectSize(const std::string ¶ms, unsigned int offset, XrdOucEnv *env, CephFile &file)
static librados::IoCtx * getIoCtx(const CephFile &file)
static CephFileRef getCephFileRef(const char *path, XrdOucEnv *env, int flags, mode_t mode, unsigned long long offset)
int ceph_posix_fremovexattr(int fd, const char *name)
void translateFileName(std::string &physName, std::string logName)
converts a logical filename to physical one if needed
int ceph_posix_close(int fd)
int ceph_posix_stat_pool(char const *poolName, long long *usedSpace)
Return the amount of space used in a pool.
librados::IoCtx * m_ioctx
std::map< unsigned int, unsigned long long > g_idxCntr
std::map< std::string, librados::IoCtx * > IOCtxDict
off_t ceph_posix_lseek(int fd, off_t offset, int whence)
void ceph_posix_set_defaults(const char *value)
void deleteFileRef(int fd, const CephFileRef &fr)
deletes a FileRef from the global table of file descriptors
int ceph_posix_fsetxattr(int fd, const char *name, const void *value, size_t size, int flags)
static void ceph_aio_read_complete(rados_completion_t c, void *arg)
int ceph_posix_ftruncate(int fd, unsigned long long size)
static int ceph_posix_internal_removexattr(const CephFile &file, const char *name)
std::vector< StriperDict > g_radosStripers
int ceph_posix_open(XrdOucEnv *env, const char *pathname, int flags, mode_t mode)
librados::NObjectIterator m_iterator
ssize_t ceph_posix_nonstriper_pread(int fd, void *buf, size_t count, off64_t offset)
static unsigned long long int stoull(const std::string &s)
simple integer parsing, to be replaced by std::stoll when C++11 can be used
static void logwrapper(char *format,...)
int ceph_posix_removexattr(XrdOucEnv *env, const char *path, const char *name)
off64_t ceph_posix_lseek64(int fd, off64_t offset, int whence)
ssize_t ceph_striper_readv(int fd, XrdOucIOVec *readV, int n)
int ceph_posix_stat(XrdOucEnv *env, const char *pathname, struct stat *buf)
bool isOpenForWrite(std::string &name)
check whether a file is open for write
static int fillCephUserId(const std::string ¶ms, XrdOucEnv *env, CephFile &file)
void fillCephFile(const char *path, XrdOucEnv *env, CephFile &file)
fill a ceph file struct from a path and an environment
ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset)
unsigned int g_cephAioWaitThresh
std::string g_defaultPool
XrdSysMutex g_fd_mutex
mutex protecting the map of file descriptors and the openForWrite multiset
ssize_t ceph_posix_setxattr(XrdOucEnv *env, const char *path, const char *name, const void *value, size_t size, int flags)
static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size)
static void(* g_logfunc)(char *, va_list argp)=0
global variable for the log function
std::string g_defaultUserId
CephFileRef * getFileRef(int fd)
look for a FileRef from its file descriptor
int ceph_posix_flistxattrs(int fd, XrdSysXAttr::AList **aPL, int getSz)
ssize_t ceph_posix_fgetxattr(int fd, const char *name, void *value, size_t size)
static int ceph_posix_internal_listxattrs(const CephFile &file, XrdSysXAttr::AList **aPL, int getSz)
static off64_t lseek_compute_offset(CephFileRef &fr, off64_t offset, int whence)
void ceph_posix_freexattrlist(XrdSysXAttr::AList *aPL)
small struct for directory listing
void() AioCB(XrdSfsAio *, size_t)
int stat(const char *path, struct stat *buf)
char * Get(const char *varname)
virtual int lfn2pfn(const char *lfn, char *buff, int blen)=0
char Name[1]
Start of the name (size of struct is dynamic)
int Vlen
The length of the attribute value;.
int Nlen
The length of the attribute name that follows.
AList * Next
-> next element.
int read(void *out_buf, size_t size, off64_t offset)
int submit_and_wait_for_complete()
small struct for aio API callbacks
AioArgs(XrdSfsAio *a, AioCB *b, size_t n, int _fd, ceph::bufferlist *_bl=0)
uint64_t bytesAsyncWritePending
unsigned asyncRdStartCount
unsigned asyncWrStartCount
uint64_t maxOffsetWritten
::timeval lastAsyncSubmission
double longestCallbackInvocation
double longestAsyncWriteTime
unsigned asyncWrCompletionCount
unsigned asyncRdCompletionCount
small structs to store file metadata
unsigned long long stripeUnit
unsigned long long objectSize