37 #include <netinet/in.h>
38 #include <sys/types.h>
83 {nodeP->Delete(
Cluster.STMutex);
86 if (!
Cluster.Drop(nodeEnt, nodeInst,
this))
delete this;
91 nodeEnt(0), nodeInst(0)
95 nodeEnt(nid), nodeInst(inst)
111 memset((
void *)NodeTab, 0,
sizeof(NodeTab));
112 memset((
void *)AltMans, (
int)
' ',
sizeof(AltMans));
121 peerMask = ~peerHost;
129 const char *theNID,
const char *theIF)
133 const char *act =
"";
137 int tmp, Slot, Free = -1, Bump1 = -1, Bump2 = -1, Bump3 = -1, aSet = 0;
139 bool SpecAlt = (Special && !(Status &
CMS_isSuper));
149 for (Slot = 0; Slot <
STMax; Slot++)
151 {
if (NodeTab[Slot]->isNode(lp, theNID, port))
break;
152 if (NodeTab[Slot]->isConn)
153 {
if (!NodeTab[Slot]->isPerm && Special)
156 if ( NodeTab[Slot]->isPerm)
157 {
if (Bump3 < 0 && Special) Bump3 = Slot;}
160 }
else if (Free < 0) Free = Slot;
165 {
if (NodeTab[Slot] && NodeTab[Slot]->isBound)
166 {
Say.
Emsg(
"Cluster", lp->
ID,
"already logged in.");
184 {
if (!(nP = AddAlt(cidP, lp, port, Status, sport, theNID, theIF)))
186 aSet = 1; Slot = nP->NodeID;
187 if (nP != NodeTab[Slot]) {Hidden =
true; act =
"Alternate ";}
194 {
if (Free >= 0) Slot = Free;
195 else {
if (Bump1 >= 0) Slot = Bump1;
196 else Slot = (Bump2 >= 0 ? Bump2 : Bump3);
199 "failed; too many subscribers.");
201 DEBUG(lp->
ID <<
" redirected; too many subscribers.");
206 if (Status &
CMS_isMan) {setAltMan(Slot, lp, sport); aSet=1;}
208 sendAList(NodeTab[Slot]->Link);
210 DEBUG(lp->
ID <<
" bumps " << NodeTab[Slot]->Ident <<
" #" <<Slot);
211 NodeTab[Slot]->Lock();
212 Remove(
"redirected", NodeTab[Slot], -1);
215 NodeTab[Slot] = nP =
new XrdCmsNode(lp, theIF, theNID, port, 0, Slot);
217 if ((cidP->
AddNode(nP, SpecAlt))) nP->cidP = cidP;
218 else {
delete nP; NodeTab[Slot] = 0;
return 0;}
227 if (!aSet && (Status &
CMS_isSuper)) setAltMan(Slot, lp, sport);
228 if (Slot > STHi) STHi = Slot;
236 nP->subsPort = sport;
245 }
else nP->
isMan |= 0x02;
249 if (nP->
isPeer) peerHost |= nP->NodeMask;
250 else peerHost &= ~nP->NodeMask;
251 peerMask = ~peerHost;
256 {
DEBUG(act <<nP->
Ident <<
" to cluster " <<nP->myNID <<
" slot "
257 <<Slot <<
'.' <<nP->Instance <<
" (nodecnt=" <<NodeCnt
281 int port,
int Status,
int sport,
282 const char *theNID,
const char *theIF)
287 int slot = cidP->Slot();
292 {
Say.
Emsg(epname, lp->ID,
"already logged in.");
299 {nP =
new XrdCmsNode(lp, theIF, theNID, port, 0, slot);
300 if (!(cidP->
AddNode(nP,
true))) {
delete nP; nP = 0;}
306 {
Say.
Emsg(epname,
"Add alternate manager", lp->ID,
307 "failed; too many subscribers.");
313 if ((pP = NodeTab[slot]) && !(pP->
isBound))
314 {setAltMan(nP->NodeID, nP->Link, sport);
334 const char *etxt =
"blacklisted.";
344 for (i = 0; i <= STHi; i++)
345 {
if ((nP = NodeTab[i]))
354 etxt =
"blacklisted; redirect unsupported.";
355 else etxt =
"blacklisted with redirect.";
357 nP->
Send((
char *)&discRequest,
sizeof(discRequest));
362 Say.
Emsg(
"Manager", nP->
Name(),
"removed from blacklist.");
375 int iovcnt,
int iotot)
385 bmask = smask & peerMask;
391 for (i = 0; i <= STHi; i++)
392 {
if ((nP = NodeTab[i]) && nP->
isNode(bmask))
396 if (nP->
Send(iod, iovcnt, iotot) < 0)
397 {unQueried |= nP->
Mask();
412 char *Data,
int Dlen)
414 struct iovec ioV[3], *iovP = &ioV[1];
421 Blen =
XrdOucPup::Pack(&iovP, Data, Temp, (Dlen ? strlen(Data)+1 : Dlen));
422 Hdr.
datalen = htons(
static_cast<unsigned short>(Blen));
426 ioV[0].iov_base = (
char *)&Hdr; ioV[0].iov_len =
sizeof(Hdr);
427 return Broadcast(smask, ioV, 3, Blen+
sizeof(Hdr));
433 void *Data,
int Dlen)
435 struct iovec ioV[2] = {{(
char *)&Hdr,
sizeof(Hdr)},
436 {(
char *)Data, (
size_t)Dlen}};
440 Hdr.
datalen = htons(
static_cast<unsigned short>(Dlen));
441 return Broadcast(smask, ioV, 2, Dlen+
sizeof(Hdr));
451 void *Data,
int Dlen)
454 static int Start = 0;
456 struct iovec ioV[2] = {{(
char *)&Hdr,
sizeof(Hdr)},
457 {(
char *)Data, (
size_t)Dlen}};
458 int i, Beg, Fin, ioTot = Dlen+
sizeof(Hdr);
462 Hdr.
datalen = htons(
static_cast<unsigned short>(Dlen));
468 Beg = Start = (Start <= STHi ? Start+1 : 0);
475 do{
for (i = Beg; i <= Fin; i++)
476 {
if ((nP = NodeTab[i]) && nP->
isNode(Who))
480 if (nP->
Send(ioV, 2, ioTot) >= 0) {nP->
unRef();
return 1;}
487 Fin = Beg-1; Beg = 0;
512 for (i = 0; i <= STHi; i++)
513 if ((nP = NodeTab[i]) && nP->
isNode(addr))
514 {smask = nP->NodeMask;
break;}
541 bool retName = (
opts & LS_IDNT) != 0;
542 bool retAny = (
opts & LS_ANY ) != 0;
543 bool retDest = retName || (
opts & LS_IPO);
549 for (i = 0; i <= STHi; i++)
550 if ((nP=NodeTab[i]) && (nP->NodeMask & mask))
553 {
if (nP->netIF.
HasDest(ifType)) ifGet = ifType;
554 else if (!retAny)
continue;
556 if (!nP->netIF.
HasDest(ifGet))
continue;
560 if (retDest) destLen = nP->netIF.
GetPublicDest(sip->Ident, iSize);
562 else {strcpy(sip->Ident, nP->myName); destLen = nP->myNlen;}
563 if (!destLen) {
delete sip;
continue;}
565 sip->IdentLen = destLen;
566 sip->Mask = nP->NodeMask;
567 sip->Id = nP->NodeID;
568 sip->Port = nP->netIF.
Port();
569 sip->RefTotW = nP->RefTotW;
570 sip->RefTotR = nP->RefTotR;
571 sip->Shrin = nP->Shrin;
572 sip->Share = nP->Share;
606 else {
if (*(Sel.
Path.
Val+1) ==
'\0')
607 {Sel.
Vec.hf = ~0LL; Sel.
Vec.pf = Sel.
Vec.wf = 0;
616 {Sel.
Vec.hf = Sel.
Vec.pf = Sel.
Vec.wf = 0;
640 amask = pmask = pinfo.
rovec;
643 if (!(retc = SelDFS(Sel, amask, pmask, smask, 1)))
646 if (retc < 0)
return NotFound;
658 qfVec = pinfo.
rovec; Sel.
Vec.hf = 0;
659 }
else qfVec = Sel.
Vec.bf;
663 if ((!qfVec && retc >= 0) || (Sel.
Vec.hf && Sel.
InfoP)) retc = 0;
687 struct iovec ioV[] = {{(
char *)&
Usage,
sizeof(
Usage)}};
688 int ioVnum =
sizeof(ioV)/
sizeof(
struct iovec);
689 int ioVtot =
sizeof(
Usage);
697 Broadcast(allNodes, ioV, ioVnum, ioVtot);
709 int snooze_interval = 60, snooze_total = 0;
710 int rCnt = 0, wCnt = 0;
711 bool resetW, resetR, resetRW;
718 int totR = 0, totW = 0;
721 for (
int i = 0; i <= STHi; i++)
722 {
if ((nP = NodeTab[i]))
723 {totR += nP->RefTotR;
729 rCnt += (totR - SelRtot); SelRtot = totR;
730 wCnt += (totW - SelWtot); SelWtot = totW;
731 snooze_total += snooze_interval;
735 resetRW = (snooze_total >=
Config.
RefReset && (resetW || resetR));
738 if (resetR) rCnt = 0;
739 if (resetW) wCnt = 0;
778 : myMutex(mtx), myNode(node), hasLK(immed < 0),
781 myNID = node->
ID(myInst);
794 myNode->DropTime = 0;
799 if (!hasLK) myMutex->
UnLock();
801 } LockHandler(&STMutex, theNode, immed);
804 int Inst, NodeID = theNode->
ID(Inst);
811 if (LockHandler.myNID != NodeID || LockHandler.myInst != Inst)
812 {
Say.
Emsg(
"Manager", LockHandler.myIdent,
"removal aborted.");
813 DEBUG(LockHandler.myIdent <<
" node " <<NodeID <<
'.' <<Inst <<
" != "
814 << LockHandler.myNID <<
'.' <<LockHandler.myInst <<
" at entry.");
828 {theNode->
Disc(reason, 0);
835 if (!(NodeTab[NodeID] == theNode))
836 {
const char *why = (theNode->
isMan ?
"dropped as alternate."
837 :
"dropped and redirected.");
839 LockHandler.doDrop =
true;
859 if (theNode->
isMan && theNode->cidP && !(theNode->cidP->
IsSingle())
860 && (altNode = theNode->cidP->
RemNode(theNode)))
861 {
if (altNode->
isBound) NodeCnt++;
862 NodeTab[NodeID] = altNode;
867 setAltMan(altNode->NodeID, altNode->Link, altNode->subsPort);
869 LockHandler.doDrop =
true;
878 LockHandler.myNode = 0;
887 if (theNode->DropJob) theNode->DropJob->
nodeInst = Inst;
888 else theNode->DropJob =
new XrdCmsDrop(NodeID, Inst);
893 Say.
Emsg(
"Manager", theNode->
Ident,
"scheduled for removal;", reason);
894 else DEBUG(theNode->
Ident <<
" node " <<NodeID <<
'.' <<Inst);
904 bool doAll (nMask == 0);
908 if (!isLocked) STMutex.ReadLock();
913 for (
int i = 0; i <= STHi; i++)
914 {
if ((nP = NodeTab[i]) && (doAll || nP->
isNode(nMask)))
917 nP->Shrem = nP->Share;
923 if (!isLocked) STMutex.UnLock();
941 {isRW = 1; Amode =
"write";
947 else {isRW = 0; Amode =
"read"; fRD = 1;}
953 {Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
954 "No servers have %s access to the file", Amode)+1;
969 {Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
970 "Too many attempts to gain dfs %s access to the file", Amode)+1;
978 if (!(retc = SelDFS(Sel, amask, pmask, smask, isRW)))
980 if (retc < 0)
return retc;
981 }
else if (noSel)
return 0;
982 return SelNode(Sel, pmask, smask);
998 {pmask = amask & ~(Sel.
Vec.hf | Sel.
Vec.bf); smask = 0;
999 if (!pmask && !Sel.
Vec.bf)
return SelFail(Sel,eNoRep);
1001 else if (Sel.
Vec.bf) pmask = smask = 0;
1002 else if (Sel.
Vec.hf)
1006 && maxBits(Sel.
Vec.hf,2))
return SelFail(Sel,eDups);
1008 != (Sel.
Vec.hf & pinfo.
rovec))
return SelFail(Sel,eROfs);
1010 if (!(pmask = Sel.
Vec.hf & amask))
return SelFail(Sel,eNoSel);
1014 {pmask = amask; smask = 0;}
1015 else if ((smask = pinfo.
ssvec & amask)) pmask = 0;
1016 else pmask = smask = 0;
1018 pmask = Sel.
Vec.hf & amask;
1020 else smask = (retc < 0 ? 0 : pinfo.
ssvec & amask);
1026 Sel.
Vec.hf = Sel.
Vec.pf = pmask = smask = 0;
1032 dowt = (!pmask && !smask);
1046 if (dowt)
return retc;
1047 }
else if (dowt && retc < 0 && !noSel)
1059 if (noSel)
return 0;
1063 if (dowt)
return Unuseable(Sel);
1071 {Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
1072 "Too many attempts to stage %s access to the file", Amode)+1;
1080 return SelNode(Sel, pmask, smask);
1086 int isrw,
int isMulti,
int ifWant)
1088 static const SMask_t smLow(255);
1097 if (!pmask)
return 0;
1117 {STMutex.ReadLock();
1120 : SelbyLoadR(pmask, selR));
1131 do {
if (!(tmask = pmask & smLow)) Snum += 8;
1132 else {
while((tmask = tmask>>1)) Snum++;
break;}
1133 }
while((pmask = pmask >> 8));
1138 if ((nP = NodeTab[Snum]))
1139 {
if (nP->
isBad) nP = 0;
1145 else {nP->RefTotW++; nP->RefW++;}
1146 else {nP->RefTotR++; nP->RefR++;}
1169 const char *etext, *Item =
"file";
1173 etext =
"Unable to create %s; it already exists.";
1176 case eROfs: etext =
"Unable to modify %s; r/o copy already exists.";
1179 case eDups: etext =
"Unable to modify %s; multiple copies exist.";
1182 case eNoRep: etext =
"Unable to replicate %s; no new sites available.";
1185 case eNoSel:
if (Sel.
Vec.hf & Sel.
nmask)
1186 {etext =
"Unable to access %s; eligible servers shunned.";
1190 {etext =
"Unable to write %s; r/w exports not found.";
1192 etext =
"Unable to access %s; it does not exist.";
1198 default: etext =
"Unable to access %s; it does not exist.";
1203 int n = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data), etext, Item);
1204 if (n < (
int)
sizeof(Sel.
Resp.Data)) Sel.
Resp.DLen = n+1;
1205 else Sel.
Resp.DLen =
sizeof(Sel.
Resp.Data);
1224 bmask = smask & peerMask;
1228 for (i = 0; i <= STHi; i++)
1230 {
if (doAll || !sData.
Total)
1256 static const char statfmt1[] =
"<stats id=\"cms\">"
1257 "<role>%s</role></stats>";
1262 if (!bfr)
return sizeof(statfmt1) + 8;
1268 if ((bln -= mlen) <= 0)
return 0;
1278 static const char statfmt0[] =
"</stats>";
1279 static const char statfmt1[] =
"<stats id=\"cmsm\">"
1280 "<role>%s</role><sel><t>%lld</t><r>%lld</r><w>%lld</w></sel>"
1282 static const char statfmt2[] =
"<stats id=\"%d\">"
1283 "<host>%s</host><role>%s</role>"
1284 "<run>%s</run><ref><r>%d</r><w>%d</w></ref>%s</stats>";
1285 static const char statfmt3[] =
"<shr>%d<use>%d</use></shr>";
1286 static const char statfmt4[] =
"</node>";
1287 static const char statfmt5[] =
1288 "<frq><add>%lld<d>%lld</d></add><rsp>%lld<m>%lld</m></rsp>"
1289 "<lf>%lld</lf><ls>%lld</ls><rf>%lld</rf><rs>%lld</rs></frq>";
1297 int mlen, tlen, n = 0;
1298 char shrBuff[80],
stat[6], *stp;
1306 while((xsp = sp)) {sp = sp->
next;
delete xsp;}
1313 {n =
sizeof(statfmt0) +
1314 sizeof(statfmt1) + 12*3 + 3 + 3 +
1315 (
sizeof(statfmt2) + 10*2 + 256 + 16) *
STMax +
sizeof(statfmt4);
1316 if (AddShr) n +=
sizeof(statfmt3) + 12;
1317 if (AddFrq) n +=
sizeof(statfmt4) + (10*8);
1328 while(sp) {n++; sp = sp->
next;}
1333 long long lclTcnt = SelTcnt, lclRtot = SelRtot, lclWtot = SelWtot;
1334 mlen = snprintf(bfr, bln, statfmt1,
1337 if ((bln -= mlen) <= 0)
return 0;
1338 tlen = mlen; bfr += mlen; n = 0; *shrBuff = 0;
1340 while(sp && bln > 0)
1349 if (AddShr) snprintf(shrBuff,
sizeof(shrBuff), statfmt3,
1351 mlen = snprintf(bfr, bln, statfmt2, n, sp->
Ident,
1354 bfr += mlen; bln -= mlen; tlen += mlen;
1358 if (bln <= (
int)
sizeof(statfmt4))
return 0;
1359 strcpy(bfr, statfmt4); mlen =
sizeof(statfmt4) - 1;
1360 bfr += mlen; bln -= mlen; tlen += mlen;
1362 if (AddFrq && bln > 0)
1363 {mlen = snprintf(bfr, bln, statfmt5, Frq.
Add2Q, Frq.
PBack, Frq.
Resp,
1365 bfr += mlen; bln -= mlen; tlen += mlen;
1370 if (sp || bln < (
int)
sizeof(statfmt0))
return 0;
1371 strcpy(bfr, statfmt0);
1372 return tlen +
sizeof(statfmt0) - 1;
1386 ?
"no eligible servers reachable for"
1387 :
"no eligible servers for");
1390 selR.
reason =
"no eligible servers have space for";
1393 selR.
reason =
"eligible servers overloaded for";
1396 selR.
reason =
"eligible servers suspended for";
1399 selR.
reason =
"eligible servers offline for";
1402 selR.
reason =
"server selection error for";
1417 int XrdCmsCluster::Drop(
int sent,
int sinst,
XrdCmsDrop *djp)
1425 if (djp) STMutex.WriteLock();
1429 if (!(nP = NodeTab[sent]) || nP->Inst() != sinst)
1430 {
if (nP && djp == nP->DropJob) {nP->DropJob = 0; nP->DropTime = 0;}
1431 if (djp) STMutex.UnLock();
1432 DEBUG(sent <<
'.' <<sinst <<
" cancelled.");
1438 if (djp && time(0) < nP->DropTime)
1440 if (djp) STMutex.UnLock();
1458 if (nP->
isPeer) {peerHost &= nP->NodeMask; peerMask = ~peerHost;}
1463 {memset((
void *)&AltMans[sent*AltSize], (
int)
' ', AltSize);
1464 if (sent == AltMent)
1466 while(AltMent >= 0 && NodeTab[AltMent]
1467 && !NodeTab[AltMent]->isMan) AltMent--;
1468 if (AltMent < 0) AltMend = AltMans;
1469 else AltMend = AltMans + ((AltMent+1)*AltSize);
1475 if (sent == STHi)
while(STHi >= 0 && !NodeTab[STHi]) STHi--;
1479 if (nP->NodeMask)
Cache.
Drop(nP->NodeMask, sent, STHi);
1485 if (djp) {STMutex.UnLock(); nP->
Delete(STMutex);}
1490 Say.
Emsg(
"Drop_Node", hname,
"dropped.");
1498 int XrdCmsCluster::Multiple(
SMask_t mVec)
1500 static const unsigned long long Left32 = 0xffffffff00000000LL;
1501 static const unsigned long long Right32 = 0x00000000ffffffffLL;
1502 static const unsigned long long Left16 = 0x00000000ffff0000LL;
1503 static const unsigned long long Right16 = 0x000000000000ffffLL;
1504 static const unsigned long long Left08 = 0x000000000000ff00LL;
1505 static const unsigned long long Right08 = 0x00000000000000ffLL;
1506 static const unsigned long long Left04 = 0x00000000000000f0LL;
1507 static const unsigned long long Right04 = 0x000000000000000fLL;
1509 static const int isMult[16] = {0,0,0,1,0,1,1,1,0,1,1,1,1,1,1,1};
1511 if (mVec & Left32) {
if (mVec & Right32)
return 1;
1512 else mVec = mVec >> 32LL;
1514 if (mVec & Left16) {
if (mVec & Right16)
return 1;
1515 else mVec = mVec >> 16LL;
1517 if (mVec & Left08) {
if (mVec & Right08)
return 1;
1518 else mVec = mVec >> 8LL;
1520 if (mVec & Left04) {
if (mVec & Right04)
return 1;
1521 else mVec = mVec >> 4LL;
1523 return isMult[mVec];
1530 bool XrdCmsCluster::maxBits(
SMask_t mVec,
int mbits)
1537 {mVec &= (mVec - 1);
1539 if (count >= mbits)
return true;
1551 void XrdCmsCluster::Record(
char *path,
const char *reason,
bool force)
1554 static
int msgcnt = 255;
1558 DEBUG(reason <<path);
1560 msgcnt++; skipmsg = msgcnt & (force ? 0x0f : 0xff);
1563 if (!skipmsg)
Say.Emsg(epname, "client deferred;", reason, path);
1574 int affsel = 1, count = 0, isalt = 0, pass = 2;
1582 selR.needNet =
XrdNetIF::Mask(nType);
1588 ? Sel.AltHash : Sel.Path.Hash);
1590 for (count = 0; sVec; count++) sVec &= (sVec - 1);
1591 if (count > 1) selR.
selPack = affsel = (theHash % count) + 1;
1607 mask = pmask & peerMask;
1611 ? SelbyRef(mask,selR)
1613 : SelbyLoadR(pmask, selR));
1624 {
TRACE(Redirect,
"affinity " <<affsel <<
'/' <<count <<
'/'
1626 <<nP->
Name() <<
' ' <<Sel.Path.Val);
1634 Sel.Resp.DLen = nP->netIF.
GetPublicName(Sel.Resp.Data, Sel.Resp.Port);
1635 if (!Sel.Resp.DLen) {nP->
UnLock();
return Unreachable(Sel,
false);}
1636 Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
1640 if (Sel.iovN && Sel.iovP) nP->
Send(Sel.iovP, Sel.iovN);
1656 {
if (isalt) act = (Sel.iovN ?
" staging " :
" assigned ");
1657 else act =
" serving ";
1659 TRACE(Stage, Sel.Resp.Data <<act <<Sel.Path.Val);
1668 Record(Sel.Path.Val,
"insufficient number of nodes",
true);
1676 Record(Sel.Path.Val, selR.
reason);
1685 {
const char *reason1 = selR.
reason;
1686 int delay1 = selR.
delay;
1687 bool noNet = selR.
xNoNet;
1688 if ((mask = (pmask | amask) & peerHost)) nP = SelbyCost(mask, selR);
1691 Sel.Resp.DLen = nP->netIF.
GetPublicName(Sel.Resp.Data,Sel.Resp.Port);
1692 if (!Sel.Resp.DLen) {nP->
UnLock();
return Unreachable(Sel,
false);}
1693 Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
1694 if (Sel.iovN && Sel.iovP) nP->
Send(Sel.iovP, Sel.iovN);
1696 TRACE(Stage,
"Peer " <<Sel.Resp.Data <<
" handling " <<Sel.Path.Val);
1710 {Record(Sel.Path.Val, selR.
reason);
1716 if (selR.
xNoNet)
return Unreachable(Sel,
true);
1717 return Unuseable(Sel);
1727 #define RefCount(sP, sPMulti, NeedSpace) \
1728 if (NeedSpace) {sP->RefTotW++; sP->RefW++;} \
1729 else {sP->RefTotR++; sP->RefR++;} \
1730 if (sPMulti && sP->Share && !sP->Shrem--) \
1731 {sP->RefW += sP->Shrip; sP->RefR += sP->Shrip; \
1732 sP->Shrem = sP->Share; sP->Shrin++; \
1751 selR.
Reset(); SelTcnt++;
1752 for (
int i = 0; i <= STHi; i++)
1753 if ((np = NodeTab[i]) && (np->NodeMask & mask))
1757 if (np->
isBad) {selR.
xSusp =
true;
continue;}
1760 else{
if (abs(sp->myCost - np->myCost) <=
Config.
P_fuzz)
1769 else if (sp->RefR > np->RefR) sp=np;
1771 else if (sp->myCost > np->myCost) sp=np;
1778 if (!sp)
return calcDelay(selR);
1796 selR.
Reset(); SelTcnt++;
1797 for (
int i = 0; i <= STHi; i++)
1798 if ((np = NodeTab[i]) && (np->NodeMask & mask))
1802 if (np->
isBad) {selR.
xSusp =
true;
continue;}
1806 {selR.
xFull =
true;
continue;}
1811 else if (sp->myMass > np->myMass) sp=np;
1818 else if (sp->RefR > np->RefR) sp=np;
1820 else if (sp->myLoad > np->myLoad) sp=np;
1828 if (!sp)
return calcDelay(selR);
1841 static std::random_device rand_dev;
1842 static std::default_random_engine generator(rand_dev());
1854 for (
int i = 0; i <= STHi; ++i) {
1857 if (!((np = NodeTab[i]) && (np->NodeMask & mask)))
1865 if (np->
isBad) { selR.
xSusp =
true;
continue; }
1877 NodeWeight[i] = totWeight;
1880 std::uniform_int_distribution<int> distr(1, totWeight);
1881 int selected = distr(generator);
1883 for (
int i = 0; i <= STHi; ++i) {
1884 if (NodeWeight[i] < selected)
1891 return sp ? sp : calcDelay(selR);
1907 selR.
Reset(); SelTcnt++;
1908 for (
int i = 0; i <= STHi; i++)
1909 if ((np = NodeTab[i]) && (np->NodeMask & mask))
1913 if (np->
isBad) {selR.
xSusp =
true;
continue;}
1916 {selR.
xFull =
true;
continue;}
1925 else if (sp->RefR > np->RefR) sp=np;
1931 if (!sp)
return calcDelay(selR);
1944 static const SMask_t allNodes(~0);
1964 Sel.
Vec.hf = amask; Sel.
Vec.wf = (isRW ? amask : 0);
1982 if (isRW && Sel.
Vec.hf)
1992 return SelFail(Sel, eNoEnt);
2002 void XrdCmsCluster::sendAList(
XrdLink *lp)
2005 static int HdrSize =
sizeof(Req.
Hdr) +
sizeof(Req.
sLen);
2006 static char *AltNext = AltMans;
2007 static struct iovec
iov[4] = {{(caddr_t)&Req, (
size_t)HdrSize},
2010 {(caddr_t)
"\0", 1}};
2015 AltNext = AltNext + AltSize;
2016 if (AltNext >= AltMend)
2019 iov[2].iov_len = dlen = AltMend - AltMans;
2021 iov[1].iov_base = (caddr_t)AltNext;
2022 iov[1].iov_len = AltMend - AltNext;
2023 iov[2].iov_len = AltNext - AltMans;
2024 dlen =
iov[1].iov_len +
iov[2].iov_len;
2030 Req.
Hdr.
datalen = htons(
static_cast<unsigned short>(dlen+
sizeof(Req.
sLen)));
2031 Req.
sLen = htons(
static_cast<unsigned short>(dlen));
2035 lp->
Send(
iov, 4, dlen+HdrSize);
2044 void XrdCmsCluster::setAltMan(
int snum,
XrdLink *lp,
int port)
2047 char *ap = &AltMans[snum*AltSize];
2053 memset(ap,
int(
' '), AltSize);
2067 if (ap >= AltMend) {AltMend = ap + AltSize; AltMent = snum;}
2081 {Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
2082 "No servers are reachable via %s network to %s%s the file.",
2085 Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
2086 "Eligible server is unreachable via %s network to %s%s the file.",
2103 int n = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data),
2104 "No servers are available to %s%s the %s.",
2105 Xmode, Amode, EType);
2106 if (n < (
int)
sizeof(Sel.
Resp.Data)) Sel.
Resp.DLen = n+1;
2107 else Sel.
Resp.DLen =
sizeof(Sel.
Resp.Data);
void Usage(const char *msg)
#define RefCount(sP, sPMulti, NeedSpace)
unsigned long long SMask_t
int stat(const char *path, struct stat *buf)
int Exists(XrdCmsRRData &Arg, XrdCmsPInfo &Who, int noLim=0)
static int Present(const char *hName, XrdOucTList *bList=0, char *rbuff=0, int rblen=0)
int GetFile(XrdCmsSelect &Sel, SMask_t mask)
int AddFile(XrdCmsSelect &Sel, SMask_t mask)
int UnkFile(XrdCmsSelect &Sel, SMask_t mask)
void Drop(SMask_t mask, int SNum, int xHi)
int WT4File(XrdCmsSelect &Sel, SMask_t mask)
static XrdCmsClustID * AddID(const char *cID)
static SMask_t Mask(const char *cID)
XrdCmsNode * RemNode(XrdCmsNode *nP)
static XrdCmsClustID * Find(const char *cID)
bool AddNode(XrdCmsNode *nP, bool isMan)
SMask_t getMask(const XrdNetAddr *addr)
void Space(XrdCms::SpaceData &sData, SMask_t smask)
int Broadsend(SMask_t smask, XrdCms::CmsRRHdr &Hdr, void *Data, int Dlen)
int Select(XrdCmsSelect &Sel)
int Locate(XrdCmsSelect &Sel)
void ResetRef(SMask_t smask, bool isLocked=false)
SMask_t Broadcast(SMask_t, const struct iovec *, int, int tot=0)
XrdCmsSelected * List(SMask_t mask, CmsLSOpts opts, bool &oksel)
XrdCmsNode * Add(XrdLink *lp, int dport, int Status, int sport, const char *theNID, const char *theIF)
void Remove(XrdCmsNode *theNode)
int Stats(char *bfr, int bln)
virtual void BlackList(XrdOucTList *blP)
int Statt(char *bfr, int bln)
static const int RepStat_shr
static const int RepStat_frq
XrdCmsDrop(int nid, int inst)
XrdCmsDrop(XrdCmsNode *nP)
static const char allowsRW
void Delete(XrdSysRWLock &gMutex)
void n2gLock(XrdSysRWLock &gMutex, bool rdlock=false)
int Send(const char *buff, int blen=0)
static const char allowsSS
static const char isDisabled
int isNode(SMask_t smask)
void g2nLock(XrdSysRWLock &gMutex)
static const char isSuspend
void Disc(const char *reason=0, int needLock=1)
void setName(XrdLink *lnkp, const char *theIF, int port)
static const char isDoomed
static const char isBlisted
int Find(const char *pname, XrdCmsPInfo &masks)
void Statistics(Info &Data)
static const char * Type(RoleID rid)
struct XrdCmsSelect::@89 Resp
struct XrdCmsSelect::@88 Vec
static const int IdentSize
void Update(StateType StateT, int ActivVal, int StageVal=0)
const XrdNetAddr * NetAddr() const
char * ID
Pointer to the client's link identity.
int Send(const char *buff, int blen)
static const int prefipv4
Use if mapped IPV4 actual format.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
@ fmtName
Hostname if it is resolvable o/w use fmtAddr.
int GetPublicName(char *nbuff, int &nport)
bool HasDest(ifType ifT=PublicV6)
static const char * Name(ifType ifT)
int GetPublicDest(char *dest, size_t dlen)
ifType
The enum that is used to index into ifData to get appropriate interface.
static int Pack(struct iovec **, const char *, unsigned short &buff)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
static struct XrdCl::None none
ZipListImpl< false > List(Ctx< ZipArchive > zip)
Factory for creating ZipStatImpl objects.
static const unsigned char kYR_Version
static const int CMS_isSuper
static const int CMS_noStage
static const int CMS_isMan
static const int CMS_isPeer
static const int CMS_Suspend