XRootD
XrdCmsCluster.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d C m s C l u s t e r . c c */
4 /* */
5 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cerrno>
32 #include <fcntl.h>
33 #include <cstdio>
34 #include <cstdlib>
35 #include <random>
36 #include <unistd.h>
37 #include <netinet/in.h>
38 #include <sys/types.h>
39 
40 #include "XProtocol/YProtocol.hh"
41 
42 #include "Xrd/XrdJob.hh"
43 #include "Xrd/XrdLink.hh"
44 #include "Xrd/XrdScheduler.hh"
45 
46 #include "XrdCms/XrdCmsBaseFS.hh"
48 #include "XrdCms/XrdCmsCache.hh"
49 #include "XrdCms/XrdCmsConfig.hh"
50 #include "XrdCms/XrdCmsCluster.hh"
51 #include "XrdCms/XrdCmsClustID.hh"
52 #include "XrdCms/XrdCmsNode.hh"
53 #include "XrdCms/XrdCmsRole.hh"
54 #include "XrdCms/XrdCmsRRQ.hh"
55 #include "XrdCms/XrdCmsState.hh"
56 #include "XrdCms/XrdCmsSelect.hh"
57 #include "XrdCms/XrdCmsTrace.hh"
58 #include "XrdCms/XrdCmsTypes.hh"
59 
60 #include "XrdOuc/XrdOucPup.hh"
61 
62 #include "XrdSys/XrdSysPlatform.hh"
63 #include "XrdSys/XrdSysPthread.hh"
64 #include "XrdSys/XrdSysTimer.hh"
65 
66 using namespace XrdCms;
67 
68 /******************************************************************************/
69 /* G l o b a l O b j e c t s */
70 /******************************************************************************/
71 
73 
74 /******************************************************************************/
75 /* L o c a l S t r u c t u r e s */
76 /******************************************************************************/
77 
79 {
80 public:
81 
82  void DoIt() {if (nodeP)
83  {nodeP->Delete(Cluster.STMutex);
84  delete this;
85  } else {
86  if (!Cluster.Drop(nodeEnt, nodeInst, this)) delete this;
87  }
88  }
89 
90  XrdCmsDrop(XrdCmsNode *nP) : XrdJob("delete node"), nodeP(nP),
91  nodeEnt(0), nodeInst(0)
92  {Sched->Schedule((XrdJob *)this);}
93 
94  XrdCmsDrop(int nid, int inst) : XrdJob("drop node"), nodeP(0),
95  nodeEnt(nid), nodeInst(inst)
96  {Sched->Schedule((XrdJob *)this, time(0)+Config.DRPDelay);}
97 
99 
103 };
104 
105 /******************************************************************************/
106 /* C o n s t r u c t o r */
107 /******************************************************************************/
108 
110 {
111  memset((void *)NodeTab, 0, sizeof(NodeTab));
112  memset((void *)AltMans, (int)' ', sizeof(AltMans));
113  AltMend = AltMans;
114  AltMent = -1;
115  NodeCnt = 0;
116  STHi = -1;
117  SelWtot = 0;
118  SelRtot = 0;
119  SelTcnt = 0;
120  peerHost = 0;
121  peerMask = ~peerHost;
122 }
123 
124 /******************************************************************************/
125 /* A d d */
126 /******************************************************************************/
127 
128 XrdCmsNode *XrdCmsCluster::Add(XrdLink *lp, int port, int Status, int sport,
129  const char *theNID, const char *theIF)
130 
131 {
132  EPNAME("Add")
133  const char *act = "";
134  XrdCmsNode *nP = 0;
135  XrdCmsClustID *cidP = 0;
136  XrdSysRWLockHelper STMHelper(STMutex, false); // Need write lock!
137  int tmp, Slot, Free = -1, Bump1 = -1, Bump2 = -1, Bump3 = -1, aSet = 0;
138  bool Special = (Status & (CMS_isMan|CMS_isPeer));
139  bool SpecAlt = (Special && !(Status & CMS_isSuper));
140  bool Hidden = false;
141 
142 // Find available slot for this node. Here are the priorities:
143 // Slot = Reconnecting node
144 // Free = Available slot ( 1st in table)
145 // Bump1 = Disconnected server (last in table)
146 // Bump2 = Connected server (last in table) if new one is managr/peer
147 // Bump3 = Disconnected managr/peer ( 1st in table) if new one is managr/peer
148 //
149  for (Slot = 0; Slot < STMax; Slot++)
150  if (NodeTab[Slot])
151  {if (NodeTab[Slot]->isNode(lp, theNID, port)) break;
152 /*Conn*/ if (NodeTab[Slot]->isConn)
153  {if (!NodeTab[Slot]->isPerm && Special)
154  Bump2 = Slot; // Last conn Server
155 /*Disc*/ } else {
156  if ( NodeTab[Slot]->isPerm)
157  {if (Bump3 < 0 && Special) Bump3 = Slot;}// 1st disc Man/Pr
158  else Bump1 = Slot; // Last disc Server
159  }
160  } else if (Free < 0) Free = Slot; // 1st free slot
161 
162 // Check if node is already logged in or is a relogin
163 //
164  if (Slot < STMax)
165  {if (NodeTab[Slot] && NodeTab[Slot]->isBound)
166  {Say.Emsg("Cluster", lp->ID, "already logged in.");
167  return 0;
168  } else { // Rehook node to previous unconnected entry
169  nP = NodeTab[Slot];
170  nP->Link = lp;
171  nP->isOffline = 0;
172  nP->isBad &= ~XrdCmsNode::isSuspend;
173  nP->isConn = 1;
174  nP->Instance++;
175  nP->setName(lp, theIF, port); // Just in case it changed
176  act = "Reconnect ";
177  }
178  }
179 
180 // First see if this node may be an alternate
181 //
182  if (!nP && SpecAlt)
183  {if ((cidP = XrdCmsClustID::Find(theNID)) && !(cidP->IsEmpty()))
184  {if (!(nP = AddAlt(cidP, lp, port, Status, sport, theNID, theIF)))
185  return 0;
186  aSet = 1; Slot = nP->NodeID;
187  if (nP != NodeTab[Slot]) {Hidden = true; act = "Alternate ";}
188  }
189  }
190 
191 // Reuse an old ID if we must or redirect the incoming node
192 //
193  if (!nP)
194  {if (Free >= 0) Slot = Free;
195  else {if (Bump1 >= 0) Slot = Bump1;
196  else Slot = (Bump2 >= 0 ? Bump2 : Bump3);
197  if (Slot < 0)
198  {if (Status & CMS_isPeer) Say.Emsg("Cluster", "Add peer", lp->ID,
199  "failed; too many subscribers.");
200  else {sendAList(lp);
201  DEBUG(lp->ID <<" redirected; too many subscribers.");
202  }
203  return 0;
204  }
205 
206  if (Status & CMS_isMan) {setAltMan(Slot, lp, sport); aSet=1;}
207  if (NodeTab[Slot] && !(Status & CMS_isPeer))
208  sendAList(NodeTab[Slot]->Link);
209 
210  DEBUG(lp->ID << " bumps " << NodeTab[Slot]->Ident <<" #" <<Slot);
211  NodeTab[Slot]->Lock();
212  Remove("redirected", NodeTab[Slot], -1);
213  act = "Shoved ";
214  }
215  NodeTab[Slot] = nP = new XrdCmsNode(lp, theIF, theNID, port, 0, Slot);
216  if (!cidP) cidP = XrdCmsClustID::AddID(theNID);
217  if ((cidP->AddNode(nP, SpecAlt))) nP->cidP = cidP;
218  else {delete nP; NodeTab[Slot] = 0; return 0;} // OK to do delete!
219  }
220 
221 // Indicate whether this snode can be redirected
222 //
223  nP->isPerm = (Status & (CMS_isMan | CMS_isPeer)) ? 1 : 0;
224 
225 // Assign new server
226 //
227  if (!aSet && (Status & CMS_isSuper)) setAltMan(Slot, lp, sport);
228  if (Slot > STHi) STHi = Slot;
229  nP->isBound = 1;
230  nP->isConn = 1;
231  nP->isNoStage = 0 != (Status & CMS_noStage);
232  nP->isBad |= (Status & CMS_Suspend ? XrdCmsNode::isSuspend : 0);
233  nP->isMan = 0 != (Status & CMS_isMan);
234  nP->isPeer = 0 != (Status & CMS_isPeer);
236  nP->subsPort = sport;
237 
238 // If this is an actual non-hidden node, count it
239 //
240  if (!Hidden)
241  {NodeCnt++;
242  if (Config.SUPLevel
243  && (tmp = NodeCnt*Config.SUPLevel/100) > Config.SUPCount)
244  {Config.SUPCount=tmp; CmsState.Set(tmp);}
245  } else nP->isMan |= 0x02;
246 
247 // Compute new peer mask, as needed
248 //
249  if (nP->isPeer) peerHost |= nP->NodeMask;
250  else peerHost &= ~nP->NodeMask;
251  peerMask = ~peerHost;
252 
253 // Document login
254 //
255  if (QTRACE(Debug))
256  {DEBUG(act <<nP->Ident <<" to cluster " <<nP->myNID <<" slot "
257  <<Slot <<'.' <<nP->Instance <<" (nodecnt=" <<NodeCnt
258  <<" supn=" <<Config.SUPCount <<")");
259  }
260 
261 // Compute new state of all nodes if we are a reporting manager.
262 //
263  if (Config.asManager() && !Hidden)
265  nP->isBad & XrdCmsNode::isSuspend ? 0 : 1,
266  nP->isNoStage ? 0 : 1);
267 
268 // All done. Return the node locked.
269 //
270  nP->Lock();
271  return nP;
272 }
273 
274 /******************************************************************************/
275 /* Private: A d d A l t */
276 /******************************************************************************/
277 
278 // Warning STMutex must be held in write mode by the caller!
279 
280 XrdCmsNode *XrdCmsCluster::AddAlt(XrdCmsClustID *cidP, XrdLink *lp,
281  int port, int Status, int sport,
282  const char *theNID, const char *theIF)
283 
284 {
285  EPNAME("AddAlt")
286  XrdCmsNode *pP, *nP = 0;
287  int slot = cidP->Slot();
288 
289 // Check if this node is already in the alternate table
290 //
291  if (cidP->Exists(lp, theNID, port))
292  {Say.Emsg(epname, lp->ID, "already logged in.");
293  return 0;
294  }
295 
296 // Add this node if there is room
297 //
298  if (cidP->Avail())
299  {nP = new XrdCmsNode(lp, theIF, theNID, port, 0, slot);
300  if (!(cidP->AddNode(nP, true))) {delete nP; nP = 0;} // OK to do delete!
301  }
302 
303 // Check if we were actually able to add this node
304 //
305  if (!nP)
306  {Say.Emsg(epname, "Add alternate manager", lp->ID,
307  "failed; too many subscribers.");
308  return 0;
309  }
310 
311 // Check if the existing lead dead and we can substiture this one
312 //
313  if ((pP = NodeTab[slot]) && !(pP->isBound))
314  {setAltMan(nP->NodeID, nP->Link, sport);
315  Say.Emsg("AddAlt", nP->Ident, "replacing dropped", pP->Ident);
316  NodeTab[slot] = nP;
317  pP->DropJob = new XrdCmsDrop(pP); // Schedule deletion
318  }
319 
320 // Hook the node to the cluster table and return
321 //
322  nP->cidP = cidP;
323  return nP;
324 }
325 
326 /******************************************************************************/
327 /* B l a c k L i s t */
328 /******************************************************************************/
329 
331 {
332  static CmsDiscRequest discRequest = {{0, kYR_disc, 0, 0}};
333  XrdCmsNode *nP;
334  const char *etxt = "blacklisted.";
335  int i, blRD = 0;
336  bool inBL;
337 
338 // Obtain a lock on the table. We need this in write mode!
339 //
340  STMutex.WriteLock();
341 
342 // Run through the table looking to put or out of the blacklist
343 //
344  for (i = 0; i <= STHi; i++)
345  {if ((nP = NodeTab[i]))
346  {inBL = (blP && (blRD = XrdCmsBlackList::Present(nP->Name(), blP)));
347  if ((!inBL && !(nP->isBad & XrdCmsNode::isBlisted))
348  || ( inBL && (nP->isBad & XrdCmsNode::isBlisted))) continue;
349  nP->g2nLock(STMutex); // Downgrade to only node lock
350  if (inBL)
352  if (blRD < -1)
353  {if (kYR_Version > nP->myVersion)
354  etxt = "blacklisted; redirect unsupported.";
355  else etxt = "blacklisted with redirect.";
357  nP->Send((char *)&discRequest, sizeof(discRequest));
358  }
359  Say.Emsg("Manager", nP->Name(), etxt);
360  } else {
362  Say.Emsg("Manager", nP->Name(), "removed from blacklist.");
363  }
364  nP->n2gLock(STMutex);
365  }
366  }
367  STMutex.UnLock();
368 }
369 
370 /******************************************************************************/
371 /* B r o a d c a s t */
372 /******************************************************************************/
373 
374 SMask_t XrdCmsCluster::Broadcast(SMask_t smask, const struct iovec *iod,
375  int iovcnt, int iotot)
376 {
377  EPNAME("Broadcast")
378  int i;
379  XrdCmsNode *nP;
380  SMask_t bmask, unQueried(0);
381 
382 // Obtain a lock on the table and screen out peer nodes
383 //
384  STMutex.ReadLock(); // Sufficient to prevent modifications
385  bmask = smask & peerMask;
386 
387 // Run through the table looking for nodes to send messages to. We don't need
388 // the node lock for this but we do need to up the reference count to keep the
389 // node pointer valid for the duration of the send() (may or may not block).
390 //
391  for (i = 0; i <= STHi; i++)
392  {if ((nP = NodeTab[i]) && nP->isNode(bmask))
393  {if (nP->isOffline) unQueried |= nP->Mask();
394  else {nP->Ref();
395  STMutex.UnLock();
396  if (nP->Send(iod, iovcnt, iotot) < 0)
397  {unQueried |= nP->Mask();
398  DEBUG(nP->Ident <<" is unreachable");
399  }
400  nP->unRef();
401  STMutex.ReadLock();
402  }
403  }
404  }
405  STMutex.UnLock();
406  return unQueried;
407 }
408 
409 /******************************************************************************/
410 
412  char *Data, int Dlen)
413 {
414  struct iovec ioV[3], *iovP = &ioV[1];
415  unsigned short Temp;
416  int Blen;
417 
418 // Construct packed data for the character argument. If data is a string then
419 // Dlen must include the null byte if it is specified at all.
420 //
421  Blen = XrdOucPup::Pack(&iovP, Data, Temp, (Dlen ? strlen(Data)+1 : Dlen));
422  Hdr.datalen = htons(static_cast<unsigned short>(Blen));
423 
424 // Complete the iovec and send off the data
425 //
426  ioV[0].iov_base = (char *)&Hdr; ioV[0].iov_len = sizeof(Hdr);
427  return Broadcast(smask, ioV, 3, Blen+sizeof(Hdr));
428 }
429 
430 /******************************************************************************/
431 
433  void *Data, int Dlen)
434 {
435  struct iovec ioV[2] = {{(char *)&Hdr, sizeof(Hdr)},
436  {(char *)Data, (size_t)Dlen}};
437 
438 // Send of the data as eveything was constructed properly
439 //
440  Hdr.datalen = htons(static_cast<unsigned short>(Dlen));
441  return Broadcast(smask, ioV, 2, Dlen+sizeof(Hdr));
442 }
443 
444 /******************************************************************************/
445 /* B r o a d s e n d */
446 /******************************************************************************/
447 
448 // Send message to first eligible node!
449 
451  void *Data, int Dlen)
452 {
453  EPNAME("Broadsend");
454  static int Start = 0;
455  XrdCmsNode *nP;
456  struct iovec ioV[2] = {{(char *)&Hdr, sizeof(Hdr)},
457  {(char *)Data, (size_t)Dlen}};
458  int i, Beg, Fin, ioTot = Dlen+sizeof(Hdr);
459 
460 // Send of the data as eveything was constructed properly
461 //
462  Hdr.datalen = htons(static_cast<unsigned short>(Dlen));
463 
464 // Obtain a lock on the table and get the starting and ending position. Note
465 // that the mechnism we use will necessarily skip newly added nodes.
466 //
467  STMutex.ReadLock(); // Sufficient to prevent modifications
468  Beg = Start = (Start <= STHi ? Start+1 : 0);
469  Fin = STHi;
470 
471 // Run through the table looking for a node to send a message to. We don't need
472 // the node lock for this but we do need to up the reference count to keep the
473 // node pointer valid for the duration of the send() (may or may not block).
474 //
475 do{for (i = Beg; i <= Fin; i++)
476  {if ((nP = NodeTab[i]) && nP->isNode(Who))
477  {if (nP->isOffline) continue;
478  nP->Ref();
479  STMutex.UnLock();
480  if (nP->Send(ioV, 2, ioTot) >= 0) {nP->unRef(); return 1;}
481  DEBUG(nP->Ident <<" is unreachable");
482  nP->unRef();
483  STMutex.ReadLock();
484  }
485  }
486  if (!Beg) break;
487  Fin = Beg-1; Beg = 0;
488  } while(1);
489 
490 // Did not send to anyone
491 //
492  STMutex.UnLock();
493  return 0;
494 }
495 
496 /******************************************************************************/
497 /* g e t M a s k */
498 /******************************************************************************/
499 
501 {
502  int i;
503  XrdCmsNode *nP;
504  SMask_t smask(0);
505 
506 // Obtain a lock on the table
507 //
508  STMutex.ReadLock();
509 
510 // Run through the table looking for a node with matching IP address
511 //
512  for (i = 0; i <= STHi; i++)
513  if ((nP = NodeTab[i]) && nP->isNode(addr))
514  {smask = nP->NodeMask; break;}
515 
516 // All done
517 //
518  STMutex.UnLock();
519  return smask;
520 }
521 
522 /******************************************************************************/
523 
525 {
526  return XrdCmsClustID::Mask(Cid);
527 }
528 
529 /******************************************************************************/
530 /* L i s t */
531 /******************************************************************************/
532 
534 {
535  static const int iSize = XrdCmsSelected::IdentSize;
536  XrdCmsNode *nP;
537  XrdCmsSelected *sipp = 0, *sip;
538  XrdNetIF::ifType ifType = (XrdNetIF::ifType)(opts & LS_IFMASK);
539  XrdNetIF::ifType ifGet = ifType;
540  int i, destLen;
541  bool retName = (opts & LS_IDNT) != 0;
542  bool retAny = (opts & LS_ANY ) != 0;
543  bool retDest = retName || (opts & LS_IPO);
544 
545 // If only one wanted, the select appropriately
546 //
547  oksel = false;
548  STMutex.ReadLock();
549  for (i = 0; i <= STHi; i++)
550  if ((nP=NodeTab[i]) && (nP->NodeMask & mask))
551  {oksel = true;
552  if (retDest)
553  { if (nP->netIF.HasDest(ifType)) ifGet = ifType;
554  else if (!retAny) continue;
555  else {ifGet = (XrdNetIF::ifType)(ifType ^ XrdNetIF::PrivateIF);
556  if (!nP->netIF.HasDest(ifGet)) continue;
557  }
558  }
559  sip = new XrdCmsSelected(sipp);
560  if (retDest) destLen = nP->netIF.GetPublicDest(sip->Ident, iSize);
561  else if (nP->myNlen >= XrdCmsSelected::IdentSize) destLen = 0;
562  else {strcpy(sip->Ident, nP->myName); destLen = nP->myNlen;}
563  if (!destLen) {delete sip; continue;}
564 
565  sip->IdentLen = destLen;
566  sip->Mask = nP->NodeMask;
567  sip->Id = nP->NodeID;
568  sip->Port = nP->netIF.Port();
569  sip->RefTotW = nP->RefTotW;
570  sip->RefTotR = nP->RefTotR;
571  sip->Shrin = nP->Shrin;
572  sip->Share = nP->Share;
573  sip->RoleID = nP->RoleID;
574  sip->Status = (nP->isOffline ? XrdCmsSelected::Offline : 0);
576  sip->Status |= XrdCmsSelected::Disable;
577  if (nP->isNoStage) sip->Status |= XrdCmsSelected::NoStage;
578  if (nP->isBad & XrdCmsNode::isSuspend)
579  sip->Status |= XrdCmsSelected::Suspend;
580  if (nP->isRW ) sip->Status |= XrdCmsSelected::isRW;
581  if (nP->isMan ) sip->Status |= XrdCmsSelected::isMangr;
582  sipp = sip;
583  }
584  STMutex.UnLock();
585 
586 // Return result
587 //
588  return sipp;
589 }
590 
591 /******************************************************************************/
592 /* L o c a t e */
593 /******************************************************************************/
594 
596 {
597  EPNAME("Locate");
598  XrdCmsPInfo pinfo;
599  SMask_t qfVec(0);
600  char *Path;
601  int retc = 0;
602 
603 // Check if this is a locate for all current servers
604 //
605  if (*Sel.Path.Val != '*') Path = Sel.Path.Val;
606  else {if (*(Sel.Path.Val+1) == '\0')
607  {Sel.Vec.hf = ~0LL; Sel.Vec.pf = Sel.Vec.wf = 0;
608  return 0;
609  }
610  Path = Sel.Path.Val+1;
611  }
612 
613 // Find out who serves this path
614 //
615  if (!Cache.Paths.Find(Path, pinfo) || !pinfo.rovec)
616  {Sel.Vec.hf = Sel.Vec.pf = Sel.Vec.wf = 0;
617  return NotFound;
618  } else Sel.Vec.wf = pinfo.rwvec;
619 
620 // Check if this was a non-lookup request
621 //
622  if (*Sel.Path.Val == '*')
623  {Sel.Vec.hf = pinfo.rovec; Sel.Vec.pf = 0;
624  Sel.Vec.wf = pinfo.rwvec;
625  return 0;
626  }
627 
628 // Complete the request info object if we have one
629 //
630  if (Sel.InfoP)
631  {Sel.InfoP->rwVec = pinfo.rwvec;
632  Sel.InfoP->isLU = 1;
633  }
634 
635 // If we are running a shared file system preform an optional restricted
636 // pre-selection and then do a standard selection.
637 //
638  if (baseFS.isDFS())
639  {SMask_t amask, smask, pmask;
640  amask = pmask = pinfo.rovec;
641  smask = (Sel.Opts & XrdCmsSelect::Online ? 0 : pinfo.ssvec & amask);
642  Sel.Resp.DLen = 0;
643  if (!(retc = SelDFS(Sel, amask, pmask, smask, 1)))
644  return (Sel.Opts & XrdCmsSelect::Asap && Sel.InfoP
645  ? Cache.WT4File(Sel,Sel.Vec.hf) : Config.LUPDelay);
646  if (retc < 0) return NotFound;
647  return 0;
648  }
649 
650 // First check if we have seen this file before. If so, get nodes that have it.
651 // A Refresh request kills this because it's as if we hadn't seen it before.
652 // If the file was found but either a query is in progress or we have a server
653 // bounce; the client must wait.
654 //
655  if (Sel.Opts & XrdCmsSelect::Refresh
656  || !(retc = Cache.GetFile(Sel, pinfo.rovec)))
657  {Cache.AddFile(Sel, 0);
658  qfVec = pinfo.rovec; Sel.Vec.hf = 0;
659  } else qfVec = Sel.Vec.bf;
660 
661 // Compute the delay, if any
662 //
663  if ((!qfVec && retc >= 0) || (Sel.Vec.hf && Sel.InfoP)) retc = 0;
664  else if (!(retc = Cache.WT4File(Sel, Sel.Vec.hf))) retc = Wait4CBk;
665 
666 // Check if we have to ask any nodes if they have the file
667 //
668  if (qfVec)
669  {CmsStateRequest QReq = {{Sel.Path.Hash, kYR_state, kYR_raw, 0}};
670  if (Sel.Opts & XrdCmsSelect::Refresh)
672  TRACE(Files, "seeking " <<Sel.Path.Val);
673  qfVec = Cluster.Broadcast(qfVec, QReq.Hdr,
674  (void *)Sel.Path.Val, Sel.Path.Len+1);
675  if (qfVec) Cache.UnkFile(Sel, qfVec);
676  }
677  return retc;
678 }
679 
680 /******************************************************************************/
681 /* M o n P e r f */
682 /******************************************************************************/
683 
685 {
686  CmsUsageRequest Usage = {{0, kYR_usage, 0, 0}};
687  struct iovec ioV[] = {{(char *)&Usage, sizeof(Usage)}};
688  int ioVnum = sizeof(ioV)/sizeof(struct iovec);
689  int ioVtot = sizeof(Usage);
690  SMask_t allNodes(~0);
691  int uInterval = Config.AskPing*Config.AskPerf;
692 
693 // Sleep for the indicated amount of time, then ask for load on each server
694 //
695  while(uInterval)
696  {XrdSysTimer::Snooze(uInterval);
697  Broadcast(allNodes, ioV, ioVnum, ioVtot);
698  }
699  return (void *)0;
700 }
701 
702 /******************************************************************************/
703 /* M o n R e f s */
704 /******************************************************************************/
705 
707 {
708  XrdCmsNode *nP;
709  int snooze_interval = 60, snooze_total = 0;
710  int rCnt = 0, wCnt = 0;
711  bool resetW, resetR, resetRW;
712 
713 // Sleep for the snooze interval. If a reset was requested then do a selective
714 // reset unless we reached our snooze maximum and enough selections have gone
715 // by; in which case, do a global reset.
716 //
717  do {XrdSysTimer::Snooze(snooze_interval);
718  int totR = 0, totW = 0;
719 
720  STMutex.ReadLock();
721  for (int i = 0; i <= STHi; i++)
722  {if ((nP = NodeTab[i]))
723  {totR += nP->RefTotR;
724  totW += nP->RefTotW;
725  }
726  }
727  STMutex.UnLock();
728 
729  rCnt += (totR - SelRtot); SelRtot = totR;
730  wCnt += (totW - SelWtot); SelWtot = totW;
731  snooze_total += snooze_interval;
732 
733  resetR = (rCnt >= Config.RefTurn);
734  resetW = (wCnt >= Config.RefTurn);
735  resetRW = (snooze_total >= Config.RefReset && (resetW || resetR));
736  if (resetRW)
737  {ResetRef((SMask_t)0);
738  if (resetR) rCnt = 0;
739  if (resetW) wCnt = 0;
740  snooze_total = 0;
741  }
742  } while(1);
743 
744  return (void *)0;
745 }
746 
747 /******************************************************************************/
748 /* R e m o v e */
749 /******************************************************************************/
750 
751 // Warning! The node object must be locked upon entry. The lock is released
752 // upon deletion of the object.
753 
755 {
756  theNode->DropJob = new XrdCmsDrop(theNode);
757 }
758 
759 // Warning! The node object must be locked upon entry. The lock is released
760 // prior to returning to the caller. This entry obtains the node
761 // table lock. When immed != 0 then the node is immediately dropped.
762 // When immed if < 0 then the caller already holds the STMutex in
763 // write mode and it is not released upon exit.
764 
765 void XrdCmsCluster::Remove(const char *reason, XrdCmsNode *theNode, int immed)
766 {
767  EPNAME("Remove_Node")
768  struct theLocks
769  {XrdSysRWLock *myMutex;
770  XrdCmsNode *myNode;
771  int myNID;
772  int myInst;
773  bool hasLK;
774  bool doDrop;
775  char myIdent[510];
776 
777  theLocks(XrdSysRWLock *mtx, XrdCmsNode *node, int immed)
778  : myMutex(mtx), myNode(node), hasLK(immed < 0),
779  doDrop(false)
780  {strlcpy(myIdent, node->Ident, sizeof(myIdent));
781  myNID = node->ID(myInst);
782  if (!hasLK)
783  {myNode->Ref(); // Keep alive
784  myNode->UnLock();
785  myMutex->WriteLock(); // Get global lock
786  myNode->Lock();
787  myNode->unRef(); // Can't escape now
788  }
789  }
790  ~theLocks()
791  {if (myNode)
792  {if (doDrop)
793  {myNode->isBound = 0;
794  myNode->DropTime = 0;
795  myNode->DropJob = new XrdCmsDrop(myNode);
796  myNode->UnLock();
797  } else myNode->UnLock();
798  }
799  if (!hasLK) myMutex->UnLock();
800  }
801  } LockHandler(&STMutex, theNode, immed);
802 
803  XrdCmsNode *altNode = 0;
804  int Inst, NodeID = theNode->ID(Inst);
805 
806 // The LockHandler makes sure that the proper locks are obtained in a deadlock
807 // free order. However, this may require that the node lock be released and
808 // then re-aquired. We check if we are still dealing with same node at entry.
809 // If not, issue message and high-tail it out.
810 //
811  if (LockHandler.myNID != NodeID || LockHandler.myInst != Inst)
812  {Say.Emsg("Manager", LockHandler.myIdent, "removal aborted.");
813  DEBUG(LockHandler.myIdent <<" node " <<NodeID <<'.' <<Inst <<" != "
814  << LockHandler.myNID <<'.' <<LockHandler.myInst <<" at entry.");
815  }
816 
817 // Mark node as being offline and remove any drop job from it
818 //
819  theNode->isOffline = 1; // Global lock is held here
820 
821 // If the node is connected we simply close the connection. This will cause
822 // the connection handler to re-initiate the node removal. This condition
823 // exists only if one node is being displaced by another node. The Disc()
824 // may take a long time, but it's done async by default on the WAN and sync
825 // on the LAN (local connections are fast enough and error-free for this).
826 //
827  if (theNode->isConn)
828  {theNode->Disc(reason, 0);
829  theNode->isGone = 1; // Disc() sets the isOffline flag
830  return;
831  }
832 
833 // If we are not the primary node, then get rid of this node post-haste
834 //
835  if (!(NodeTab[NodeID] == theNode))
836  {const char *why = (theNode->isMan ? "dropped as alternate."
837  : "dropped and redirected.");
838  Say.Emsg("Remove_Node", theNode->Ident, why);
839  LockHandler.doDrop = true;
840  return;
841  }
842 
843 
844 // If the node is part of the cluster, do not count it anymore and
845 // indicate new state of this nodes if we are a reporting manager
846 //
847  if (theNode->isBound)
848  {theNode->isBound = 0;
849  NodeCnt--;
850  if (Config.asManager())
852  theNode->isBad & XrdCmsNode::isSuspend ? 0 : -1,
853  theNode->isNoStage ? 0 : -1);
854  }
855 
856 // If we have a working alternate, substitute it here and immediately drop
857 // the former primary. This allows the cache to remain warm.
858 //
859  if (theNode->isMan && theNode->cidP && !(theNode->cidP->IsSingle())
860  && (altNode = theNode->cidP->RemNode(theNode)))
861  {if (altNode->isBound) NodeCnt++;
862  NodeTab[NodeID] = altNode;
863  if (Config.asManager())
865  altNode->isBad & XrdCmsNode::isSuspend ? 0 : 1,
866  altNode->isNoStage ? 0 : 1);
867  setAltMan(altNode->NodeID, altNode->Link, altNode->subsPort);
868  Say.Emsg("Manager",altNode->Ident,"replacing dropped",theNode->Ident);
869  LockHandler.doDrop = true;
870  return;
871  }
872 
873 // If this is an immediate drop request, do so now. Drop() will delete
874 // the node object, so remove the node lock and tell LockHandler that.
875 //
876  if (immed || !Config.DRPDelay || theNode->isBad & XrdCmsNode::isDoomed)
877  {theNode->UnLock();
878  LockHandler.myNode = 0;
879  Drop(NodeID, Inst);
880  return;
881  }
882 
883 // If a drop job is already scheduled, update the instance field. Otherwise,
884 // Schedule a node drop at a future time.
885 //
886  theNode->DropTime = time(0)+Config.DRPDelay;
887  if (theNode->DropJob) theNode->DropJob->nodeInst = Inst;
888  else theNode->DropJob = new XrdCmsDrop(NodeID, Inst);
889 
890 // Document removal
891 //
892  if (reason)
893  Say.Emsg("Manager", theNode->Ident, "scheduled for removal;", reason);
894  else DEBUG(theNode->Ident <<" node " <<NodeID <<'.' <<Inst);
895 }
896 
897 /******************************************************************************/
898 /* R e s e t R e f */
899 /******************************************************************************/
900 
901 void XrdCmsCluster::ResetRef(SMask_t nMask, bool isLocked)
902 {
903  XrdCmsNode *nP;
904  bool doAll (nMask == 0);
905 
906 // Obtain a lock on the table if not already locked
907 //
908  if (!isLocked) STMutex.ReadLock();
909 
910 // Reset reference counts as needed. We can do this with a read lock as the
911 // reference counters are atomic.
912 //
913  for (int i = 0; i <= STHi; i++)
914  {if ((nP = NodeTab[i]) && (doAll || nP->isNode(nMask)))
915  {nP->RefW = 0;
916  nP->RefR = 0;
917  nP->Shrem = nP->Share;
918  }
919  }
920 
921 // Unlock table and exit
922 //
923  if (!isLocked) STMutex.UnLock();
924 }
925 
926 /******************************************************************************/
927 /* S e l e c t */
928 /******************************************************************************/
929 
931 {
932  EPNAME("Select");
933  XrdCmsPInfo pinfo;
934  const char *Amode;
935  int dowt = 0, retc = 0, isRW, fRD, noSel = (Sel.Opts & XrdCmsSelect::Defer);
936  SMask_t amask, smask, pmask;
937 
938 // Establish some local options
939 //
940  if (Sel.Opts & XrdCmsSelect::Write)
941  {isRW = 1; Amode = "write";
942  if (Config.RWDelay)
943  if (Sel.Opts & XrdCmsSelect::Create && Config.RWDelay < 2) fRD = 1;
944  else fRD = 0;
945  else fRD = 1;
946  }
947  else {isRW = 0; Amode = "read"; fRD = 1;}
948 
949 // Find out who serves this path
950 //
951  if (!Cache.Paths.Find(Sel.Path.Val, pinfo)
952  || (amask = ((isRW ? pinfo.rwvec : pinfo.rovec) & ~Sel.nmask)) == 0)
953  {Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
954  "No servers have %s access to the file", Amode)+1;
955  Sel.Resp.Port = kYR_ENOENT;
956  return EReplete;
957  }
958 
959 // If we are running a shared file system preform an optional restricted
960 // pre-selection and then do a standard selection. Since all nodes are equal,
961 // make sure the client is needlessly avoiding them as this signals an error.
962 //
963  if (baseFS.isDFS())
964  {if (Sel.nmask && !(Sel.Opts & XrdCmsSelect::NoTryLim))
965  {pmask = (isRW ? pinfo.rwvec : pinfo.rovec) & Sel.nmask;
966  if (!(Sel.Opts & XrdCmsSelect::Online))
967  pmask |= pinfo.ssvec & Sel.nmask;
968  if (pmask && maxBits(pmask, baseFS.dfsTries()))
969  {Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
970  "Too many attempts to gain dfs %s access to the file", Amode)+1;
971  return RetryErr;
972  }
973  }
974  pmask = amask;
975  smask = (Sel.Opts & XrdCmsSelect::Online ? 0 : pinfo.ssvec & amask);
976  if (baseFS.Trim())
977  {Sel.Resp.DLen = 0;
978  if (!(retc = SelDFS(Sel, amask, pmask, smask, isRW)))
979  return (fRD ? Cache.WT4File(Sel,Sel.Vec.hf) : Config.LUPDelay);
980  if (retc < 0) return retc;
981  } else if (noSel) return 0;
982  return SelNode(Sel, pmask, smask);
983  }
984 
985 // If either a refresh is wanted or we didn't find the file, re-prime the cache
986 // which will force the client to wait. Otherwise, compute the primary and
987 // secondary selections. If there are none, the client may have to wait if we
988 // have servers that we can query regarding the file. Note that for files being
989 // opened in write mode, only one writable copy may exist unless this is a
990 // meta-operation (e.g., remove) in which case the file itself remain unmodified
991 // or a replica request, in which case we select a new target server.
992 //
993  if (!(Sel.Opts & XrdCmsSelect::Refresh)
994  && (retc = Cache.GetFile(Sel, pinfo.rovec)))
995  {if (isRW)
996  { if (retc<0) return Config.LUPDelay;
997  else if (Sel.Opts & XrdCmsSelect::Replica)
998  {pmask = amask & ~(Sel.Vec.hf | Sel.Vec.bf); smask = 0;
999  if (!pmask && !Sel.Vec.bf) return SelFail(Sel,eNoRep);
1000  }
1001  else if (Sel.Vec.bf) pmask = smask = 0;
1002  else if (Sel.Vec.hf)
1003  {if (Sel.Opts & XrdCmsSelect::NewFile) return SelFail(Sel,eExists);
1004  if (!(Sel.Opts & XrdCmsSelect::MWFiles))
1005  {if (!(Sel.Opts & XrdCmsSelect::isMeta)
1006  && maxBits(Sel.Vec.hf,2)) return SelFail(Sel,eDups);
1007  if ((Sel.Vec.hf & pinfo.rwvec)
1008  != (Sel.Vec.hf & pinfo.rovec)) return SelFail(Sel,eROfs);
1009  }
1010  if (!(pmask = Sel.Vec.hf & amask)) return SelFail(Sel,eNoSel);
1011  smask = 0;
1012  }
1013  else if (Sel.Opts & (XrdCmsSelect::Trunc | XrdCmsSelect::NewFile))
1014  {pmask = amask; smask = 0;}
1015  else if ((smask = pinfo.ssvec & amask)) pmask = 0;
1016  else pmask = smask = 0;
1017  } else {
1018  pmask = Sel.Vec.hf & amask;
1019  if (Sel.Opts & XrdCmsSelect::Online) {pmask &= ~Sel.Vec.pf; smask=0;}
1020  else smask = (retc < 0 ? 0 : pinfo.ssvec & amask);
1021  }
1022  if (Sel.Vec.hf & Sel.nmask) Cache.UnkFile(Sel, Sel.nmask);
1023  } else {
1024  Cache.AddFile(Sel, 0);
1025  Sel.Vec.bf = pinfo.rovec;
1026  Sel.Vec.hf = Sel.Vec.pf = pmask = smask = 0;
1027  retc = 0;
1028  }
1029 
1030 // A wait is required if we don't have any primary or seconday servers
1031 //
1032  dowt = (!pmask && !smask);
1033 
1034 // If we can query additional servers, do so now. The client will be placed
1035 // in the callback queue only if we have no possible selections
1036 //
1037  if (Sel.Vec.bf)
1038  {CmsStateRequest QReq = {{Sel.Path.Hash, kYR_state, kYR_raw, 0}};
1039  if (Sel.Opts & XrdCmsSelect::Refresh)
1041  if (dowt) retc= (fRD ? Cache.WT4File(Sel,Sel.Vec.hf) : Config.LUPDelay);
1042  TRACE(Files, "seeking " <<Sel.Path.Val);
1043  amask = Cluster.Broadcast(Sel.Vec.bf, QReq.Hdr,
1044  (void *)Sel.Path.Val,Sel.Path.Len+1);
1045  if (amask) Cache.UnkFile(Sel, amask);
1046  if (dowt) return retc;
1047  } else if (dowt && retc < 0 && !noSel)
1048  return (fRD ? Cache.WT4File(Sel,Sel.Vec.hf) : Config.LUPDelay);
1049 
1050 // Broadcast a freshen up request if wanted
1051 //
1052  if ((Sel.Opts & XrdCmsSelect::Freshen) && (amask = pmask & ~Sel.Vec.bf))
1054  Cluster.Broadcast(amask, Qupt.Hdr,(void *)Sel.Path.Val,Sel.Path.Len+1);
1055  }
1056 
1057 // If we need to defer selection, simply return as this is a mindless prepare
1058 //
1059  if (noSel) return 0;
1060 
1061 // Check if we have no useable servers
1062 //
1063  if (dowt) return Unuseable(Sel);
1064 
1065 // Check if should eliminate staging servers. We may need to do this if the
1066 // client has been eliminating too many of them as they all should be equal.
1067 //
1068  if (Sel.nmask && pinfo.ssvec && !(Sel.Opts & XrdCmsSelect::NoTryLim)
1069  && maxBits(Sel.nmask & pinfo.ssvec, baseFS.stgTries()))
1070  {if (!pmask)
1071  {Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
1072  "Too many attempts to stage %s access to the file", Amode)+1;
1073  return RetryErr;
1074  }
1075  smask = 0;
1076  }
1077 
1078 // Select a node
1079 //
1080  return SelNode(Sel, pmask, smask);
1081 }
1082 
1083 /******************************************************************************/
1084 
1085 int XrdCmsCluster::Select(SMask_t pmask, int &port, char *hbuff, int &hlen,
1086  int isrw, int isMulti, int ifWant)
1087 {
1088  static const SMask_t smLow(255);
1089  XrdCmsSelector selR;
1090  XrdCmsNode *nP = 0;
1091  SMask_t tmask;
1092  int Snum = 0;
1093  XrdNetIF::ifType nType = static_cast<XrdNetIF::ifType>(ifWant);
1094 
1095 // If there is nothing to select from, return failure
1096 //
1097  if (!pmask) return 0;
1098 
1099 // Obtain the network we need for the client
1100 //
1101  selR.needNet = XrdNetIF::Mask(nType);
1102 
1103 // Initialize
1104 //
1105  selR.needSpace = 0;
1106 
1107 // Packed selection can never occur in this code path so we turn it off
1108 //
1109  selR.selPack = 0;
1110 
1111 // If we are exporting a shared-everything system then the incoming mask
1112 // may have more than one server indicated. So, we need to do a full select.
1113 // This is forced when isMulti is true, indicating a choice may exist. Note
1114 // that the node, if any, is returned unlocked but we have the global mutex.
1115 //
1116  if (isMulti || baseFS.isDFS())
1117  {STMutex.ReadLock();
1118  nP = (Config.sched_RR ? SelbyRef(pmask,selR)
1119  : Config.sched_LoadR == 0 ? SelbyLoad(pmask,selR)
1120  : SelbyLoadR(pmask, selR));
1121 
1122  if (nP) hlen = nP->netIF.GetPublicName(hbuff, port) + 1;
1123  else hlen = 0;
1124  STMutex.UnLock();
1125  return hlen != 1;
1126  }
1127 
1128 // In shared-nothing systems the incoming mask will only have a single node.
1129 // Compute the a single node number that is contained in the mask.
1130 //
1131  do {if (!(tmask = pmask & smLow)) Snum += 8;
1132  else {while((tmask = tmask>>1)) Snum++; break;}
1133  } while((pmask = pmask >> 8));
1134 
1135 // See if the node passes muster
1136 //
1137  STMutex.ReadLock();
1138  if ((nP = NodeTab[Snum]))
1139  { if (nP->isBad) nP = 0;
1140  else if (!Config.sched_RR && (nP->myLoad > Config.MaxLoad)) nP = 0;
1141  else if (!(selR.needNet & nP->hasNet)) nP = 0;
1142  if (nP)
1143  {if (isrw)
1144  if (nP->isNoStage || nP->DiskFree < nP->DiskMinF) nP = 0;
1145  else {nP->RefTotW++; nP->RefW++;}
1146  else {nP->RefTotR++; nP->RefR++;}
1147  }
1148  }
1149 
1150 // At this point either we have a node or we do not
1151 //
1152  if (nP)
1153  {hlen = nP->netIF.GetPublicName(hbuff, port) + 1;
1154  nP->RefR++;
1155  STMutex.UnLock();
1156  return hlen != 1;
1157  }
1158  STMutex.UnLock();
1159  return 0;
1160 }
1161 
1162 /******************************************************************************/
1163 /* S e l F a i l */
1164 /******************************************************************************/
1165 
1166 int XrdCmsCluster::SelFail(XrdCmsSelect &Sel, int rc)
1167 {
1168 //
1169  const char *etext, *Item = "file";
1170 
1171  switch(rc)
1172  {case eExists: if (Sel.Opts & XrdCmsSelect::isMeta) Item = "directory";
1173  etext = "Unable to create %s; it already exists.";
1174  Sel.Resp.Port = kYR_RWConflict;
1175  break;
1176  case eROfs: etext = "Unable to modify %s; r/o copy already exists.";
1177  Sel.Resp.Port = kYR_RWConflict;
1178  break;
1179  case eDups: etext = "Unable to modify %s; multiple copies exist.";
1180  Sel.Resp.Port = kYR_RWConflict;
1181  break;
1182  case eNoRep: etext = "Unable to replicate %s; no new sites available.";
1183  Sel.Resp.Port = kYR_noReplicas;
1184  break;
1185  case eNoSel: if (Sel.Vec.hf & Sel.nmask)
1186  {etext = "Unable to access %s; eligible servers shunned.";
1187  if (Sel.Opts & XrdCmsSelect::isDir) Item = "directory";
1188  } else {
1189  if (Sel.Opts & XrdCmsSelect::Write)
1190  {etext = "Unable to write %s; r/w exports not found.";
1191  } else {
1192  etext = "Unable to access %s; it does not exist.";
1193  if (Sel.Opts & XrdCmsSelect::isDir) Item = "directory";
1194  }
1195  }
1196  Sel.Resp.Port = kYR_ENOENT;
1197  break;
1198  default: etext = "Unable to access %s; it does not exist.";
1199  Sel.Resp.Port = kYR_ENOENT;
1200  break;
1201  };
1202 
1203  int n = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data), etext, Item);
1204  if (n < (int)sizeof(Sel.Resp.Data)) Sel.Resp.DLen = n+1;
1205  else Sel.Resp.DLen = sizeof(Sel.Resp.Data);
1206 
1207  return EReplete;
1208 }
1209 
1210 /******************************************************************************/
1211 /* S p a c e */
1212 /******************************************************************************/
1213 
1215 {
1216  XrdCmsNode *nP;
1217  SMask_t bmask;
1218  int i;
1219  bool doAll = !baseFS.isDFS();
1220 
1221 // Obtain a lock on the table and screen out peer nodes
1222 //
1223  STMutex.ReadLock();
1224  bmask = smask & peerMask;
1225 
1226 // Run through the table getting space information
1227 //
1228  for (i = 0; i <= STHi; i++)
1229  if ((nP = NodeTab[i]) && nP->isNode(bmask) && !(nP->isOffline))
1230  {if (doAll || !sData.Total)
1231  {sData.Total += nP->DiskTotal;
1232  sData.TotFr += nP->DiskFree;
1233  }
1234  if (nP->isRW & XrdCmsNode::allowsSS)
1235  {sData.sNum++;
1236  if (sData.sFree < nP->DiskFree)
1237  {sData.sFree = nP->DiskFree; sData.sUtil = nP->DiskUtil;}
1238  }
1239  if (nP->isRW & XrdCmsNode::allowsRW)
1240  {sData.wNum++;
1241  if (sData.wFree < nP->DiskFree)
1242  {sData.wFree = nP->DiskFree; sData.wUtil = nP->DiskUtil;
1243  sData.wMinF = nP->DiskMinF;
1244  }
1245  }
1246  }
1247  STMutex.UnLock();
1248 }
1249 
1250 /******************************************************************************/
1251 /* S t a t s */
1252 /******************************************************************************/
1253 
1254 int XrdCmsCluster::Stats(char *bfr, int bln)
1255 {
1256  static const char statfmt1[] = "<stats id=\"cms\">"
1257  "<role>%s</role></stats>";
1258  int mlen;
1259 
1260 // Check if actual length wanted
1261 //
1262  if (!bfr) return sizeof(statfmt1) + 8;
1263 
1264 // Format the statistics (not much here for now)
1265 //
1266  mlen = snprintf(bfr, bln, statfmt1, Config.myRType);
1267 
1268  if ((bln -= mlen) <= 0) return 0;
1269  return mlen;
1270 }
1271 
1272 /******************************************************************************/
1273 /* S t a t t */
1274 /******************************************************************************/
1275 
1276 int XrdCmsCluster::Statt(char *bfr, int bln)
1277 {
1278  static const char statfmt0[] = "</stats>";
1279  static const char statfmt1[] = "<stats id=\"cmsm\">"
1280  "<role>%s</role><sel><t>%lld</t><r>%lld</r><w>%lld</w></sel>"
1281  "<node>%d";
1282  static const char statfmt2[] = "<stats id=\"%d\">"
1283  "<host>%s</host><role>%s</role>"
1284  "<run>%s</run><ref><r>%d</r><w>%d</w></ref>%s</stats>";
1285  static const char statfmt3[] = "<shr>%d<use>%d</use></shr>";
1286  static const char statfmt4[] = "</node>";
1287  static const char statfmt5[] =
1288  "<frq><add>%lld<d>%lld</d></add><rsp>%lld<m>%lld</m></rsp>"
1289  "<lf>%lld</lf><ls>%lld</ls><rf>%lld</rf><rs>%lld</rs></frq>";
1290 
1291  static int AddFrq = (Config.RepStats & XrdCmsConfig::RepStat_frq);
1292  static int AddShr = (Config.RepStats & XrdCmsConfig::RepStat_shr)
1293  && Config.asMetaMan();
1294 
1295  XrdCmsRRQ::Info Frq;
1296  XrdCmsSelected *sp;
1297  int mlen, tlen, n = 0;
1298  char shrBuff[80], stat[6], *stp;
1299  bool oksel;
1300 
1301  class spmngr {
1302  public: XrdCmsSelected *sp;
1303 
1304  spmngr() {sp = 0;}
1305  ~spmngr() {XrdCmsSelected *xsp;
1306  while((xsp = sp)) {sp = sp->next; delete xsp;}
1307  }
1308  } mngrsp;
1309 
1310 // Check if actual length wanted
1311 //
1312  if (!bfr)
1313  {n = sizeof(statfmt0) +
1314  sizeof(statfmt1) + 12*3 + 3 + 3 +
1315  (sizeof(statfmt2) + 10*2 + 256 + 16) * STMax + sizeof(statfmt4);
1316  if (AddShr) n += sizeof(statfmt3) + 12;
1317  if (AddFrq) n += sizeof(statfmt4) + (10*8);
1318  return n;
1319  }
1320 
1321 // Get the statistics
1322 //
1323  if (AddFrq) RRQ.Statistics(Frq);
1324  mngrsp.sp = sp = List(FULLMASK, LS_NULL, oksel);
1325 
1326 // Count number of nodes we have
1327 //
1328  while(sp) {n++; sp = sp->next;}
1329  sp = mngrsp.sp;
1330 
1331 // Format the statistics
1332 //
1333  long long lclTcnt = SelTcnt, lclRtot = SelRtot, lclWtot = SelWtot;
1334  mlen = snprintf(bfr, bln, statfmt1,
1335  Config.myRType, lclTcnt, lclRtot, lclWtot, n);
1336 
1337  if ((bln -= mlen) <= 0) return 0;
1338  tlen = mlen; bfr += mlen; n = 0; *shrBuff = 0;
1339 
1340  while(sp && bln > 0)
1341  {stp = stat;
1342  if (sp->Status & XrdCmsSelected::Offline) *stp++ = 'o';
1343  else if (sp->Status & XrdCmsSelected::Suspend) *stp++ = 's';
1344  else if (sp->Status & XrdCmsSelected::Disable) *stp++ = 'd';
1345  else *stp++ = 'a';
1346  if (sp->Status & XrdCmsSelected::isRW) *stp++ = 'w';
1347  if (sp->Status & XrdCmsSelected::NoStage) *stp++ = 'n';
1348  *stp = 0;
1349  if (AddShr) snprintf(shrBuff, sizeof(shrBuff), statfmt3,
1350  (sp->Share ? sp->Share : 100), sp->Shrin);
1351  mlen = snprintf(bfr, bln, statfmt2, n, sp->Ident,
1352  XrdCmsRole::Type(static_cast<XrdCmsRole::RoleID>(sp->RoleID)),
1353  stat, sp->RefTotR, sp->RefTotW, shrBuff);
1354  bfr += mlen; bln -= mlen; tlen += mlen;
1355  sp = sp->next; n++;
1356  }
1357 
1358  if (bln <= (int)sizeof(statfmt4)) return 0;
1359  strcpy(bfr, statfmt4); mlen = sizeof(statfmt4) - 1;
1360  bfr += mlen; bln -= mlen; tlen += mlen;
1361 
1362  if (AddFrq && bln > 0)
1363  {mlen = snprintf(bfr, bln, statfmt5, Frq.Add2Q, Frq.PBack, Frq.Resp,
1364  Frq.Multi, Frq.luFast, Frq.luSlow, Frq.rdFast, Frq.rdSlow);
1365  bfr += mlen; bln -= mlen; tlen += mlen;
1366  }
1367 
1368 // See if we overflowed. otherwise finish up
1369 //
1370  if (sp || bln < (int)sizeof(statfmt0)) return 0;
1371  strcpy(bfr, statfmt0);
1372  return tlen + sizeof(statfmt0) - 1;
1373 }
1374 
1375 /******************************************************************************/
1376 /* P r i v a t e M e t h o d s */
1377 /******************************************************************************/
1378 /******************************************************************************/
1379 /* c a l c D e l a y */
1380 /******************************************************************************/
1381 
1382 XrdCmsNode *XrdCmsCluster::calcDelay(XrdCmsSelector &selR)
1383 {
1384  if (!selR.nPick) {selR.delay = 0;
1385  selR.reason = (selR.xNoNet
1386  ? "no eligible servers reachable for"
1387  : "no eligible servers for");
1388  }
1389  else if (selR.xFull) {selR.delay = Config.DiskWT;
1390  selR.reason = "no eligible servers have space for";
1391  }
1392  else if (selR.xOvld) {selR.delay = Config.MaxDelay;
1393  selR.reason = "eligible servers overloaded for";
1394  }
1395  else if (selR.xSusp) {selR.delay = Config.SUSDelay;
1396  selR.reason = "eligible servers suspended for";
1397  }
1398  else if (selR.xOff) {selR.delay = Config.SUPDelay;
1399  selR.reason = "eligible servers offline for";
1400  }
1401  else {selR.delay = Config.SUPDelay;
1402  selR.reason = "server selection error for";
1403  }
1404  return (XrdCmsNode *)0;
1405 }
1406 
1407 /******************************************************************************/
1408 /* D r o p */
1409 /******************************************************************************/
1410 
1411 // Warning: STMutex must be locked in write upon entry and the caller must
1412 // release it if this method is called directily. Otherwise, the mutex
1413 // will be obtained and released. Also, this method may only be called
1414 // via Remove() either directly or via a deferred job scheduled by that
1415 // method. This method actually deletes the node object.
1416 
1417 int XrdCmsCluster::Drop(int sent, int sinst, XrdCmsDrop *djp)
1418 {
1419  EPNAME("Drop_Node")
1420  XrdCmsNode *nP;
1421  char hname[512];
1422 
1423 // If we are being called outside of a scheduled job, obtain the mutex
1424 //
1425  if (djp) STMutex.WriteLock();
1426 
1427 // Make sure this node is the right one
1428 //
1429  if (!(nP = NodeTab[sent]) || nP->Inst() != sinst)
1430  {if (nP && djp == nP->DropJob) {nP->DropJob = 0; nP->DropTime = 0;}
1431  if (djp) STMutex.UnLock();
1432  DEBUG(sent <<'.' <<sinst <<" cancelled.");
1433  return 0;
1434  }
1435 
1436 // Check if the drop has been rescheduled
1437 //
1438  if (djp && time(0) < nP->DropTime)
1439  {Sched->Schedule((XrdJob *)djp, nP->DropTime);
1440  if (djp) STMutex.UnLock();
1441  return 1;
1442  }
1443 
1444 // Save the node name (don't want to hold a lock across a message)
1445 //
1446  strlcpy(hname, nP->Ident, sizeof(hname));
1447 
1448 // Cleanup status
1449 //
1450  NodeTab[sent] = 0;
1451  nP->isOffline = 1; // STMutex is locked in write mode
1452  nP->DropTime = 0;
1453  nP->DropJob = 0;
1454  nP->isBound = 0;
1455 
1456 // Remove node from the peer list (if it is one)
1457 //
1458  if (nP->isPeer) {peerHost &= nP->NodeMask; peerMask = ~peerHost;}
1459 
1460 // Remove node entry from the alternate list and readjust the end pointer.
1461 //
1462  if (nP->isMan)
1463  {memset((void *)&AltMans[sent*AltSize], (int)' ', AltSize);
1464  if (sent == AltMent)
1465  {AltMent--;
1466  while(AltMent >= 0 && NodeTab[AltMent]
1467  && !NodeTab[AltMent]->isMan) AltMent--;
1468  if (AltMent < 0) AltMend = AltMans;
1469  else AltMend = AltMans + ((AltMent+1)*AltSize);
1470  }
1471  }
1472 
1473 // Readjust STHi
1474 //
1475  if (sent == STHi) while(STHi >= 0 && !NodeTab[STHi]) STHi--;
1476 
1477 // Invalidate any cached entries for this node
1478 //
1479  if (nP->NodeMask) Cache.Drop(nP->NodeMask, sent, STHi);
1480 
1481 // We can now delete the node object if we were called via a job as we are on
1482 // a different thread. Direct calls require that we schedule the deletion as
1483 // it may take a long time if there are oustanding references to this node.
1484 //
1485  if (djp) {STMutex.UnLock(); nP->Delete(STMutex);}
1486  else nP->DropJob = new XrdCmsDrop(nP);
1487 
1488 // Document the drop
1489 //
1490  Say.Emsg("Drop_Node", hname, "dropped.");
1491  return 0;
1492 }
1493 
1494 /******************************************************************************/
1495 /* M u l t i p l e */
1496 /******************************************************************************/
1497 
1498 int XrdCmsCluster::Multiple(SMask_t mVec)
1499 {
1500  static const unsigned long long Left32 = 0xffffffff00000000LL;
1501  static const unsigned long long Right32 = 0x00000000ffffffffLL;
1502  static const unsigned long long Left16 = 0x00000000ffff0000LL;
1503  static const unsigned long long Right16 = 0x000000000000ffffLL;
1504  static const unsigned long long Left08 = 0x000000000000ff00LL;
1505  static const unsigned long long Right08 = 0x00000000000000ffLL;
1506  static const unsigned long long Left04 = 0x00000000000000f0LL;
1507  static const unsigned long long Right04 = 0x000000000000000fLL;
1508 // 0 1 2 3 4 5 6 7 8 9 A B C D E F
1509  static const int isMult[16] = {0,0,0,1,0,1,1,1,0,1,1,1,1,1,1,1};
1510 
1511  if (mVec & Left32) {if (mVec & Right32) return 1;
1512  else mVec = mVec >> 32LL;
1513  }
1514  if (mVec & Left16) {if (mVec & Right16) return 1;
1515  else mVec = mVec >> 16LL;
1516  }
1517  if (mVec & Left08) {if (mVec & Right08) return 1;
1518  else mVec = mVec >> 8LL;
1519  }
1520  if (mVec & Left04) {if (mVec & Right04) return 1;
1521  else mVec = mVec >> 4LL;
1522  }
1523  return isMult[mVec];
1524 }
1525 
1526 /******************************************************************************/
1527 /* m a x B i t s */
1528 /******************************************************************************/
1529 
1530 bool XrdCmsCluster::maxBits(SMask_t mVec, int mbits)
1531 {
1532  int count = 0;
1533 
1534 // Count bits. This is the fastest way assuming few bits are set
1535 //
1536  while(mVec)
1537  {mVec &= (mVec - 1);
1538  count++;
1539  if (count >= mbits) return true;
1540  }
1541 
1542 // Indicate we have not reached the maximum bits set
1543 //
1544  return false;
1545 }
1546 
1547 /******************************************************************************/
1548 /* R e c o r d */
1549 /******************************************************************************/
1550 
1551 void XrdCmsCluster::Record(char *path, const char *reason, bool force)
1552 {
1553  EPNAME("Record")
1554  static int msgcnt = 255;
1555  static XrdSysMutex mcMutex;
1556  int skipmsg;
1557 
1558  DEBUG(reason <<path);
1559  mcMutex.Lock();
1560  msgcnt++; skipmsg = msgcnt & (force ? 0x0f : 0xff);
1561  mcMutex.UnLock();
1562 
1563  if (!skipmsg) Say.Emsg(epname, "client deferred;", reason, path);
1564 }
1565 
1566 /******************************************************************************/
1567 /* S e l N o d e */
1568 /******************************************************************************/
1569 
1570 int XrdCmsCluster::SelNode(XrdCmsSelect &Sel, SMask_t pmask, SMask_t amask)
1571 {
1572  EPNAME("SelNode")
1573  const char *act=0;
1574  int affsel = 1, count = 0, isalt = 0, pass = 2;
1575  SMask_t mask;
1576  XrdCmsNode *nP = 0;
1577  XrdCmsSelector selR;
1578  XrdNetIF::ifType nType=(XrdNetIF::ifType)(Sel.Opts & XrdCmsSelect::ifWant);
1579 
1580 // Obtain the network we need for the client
1581 //
1582  selR.needNet = XrdNetIF::Mask(nType);
1583 
1584 // Indicate whether or not stable selection is required
1585 //
1586  if (!(Sel.Opts & XrdCmsSelect::Pack)) selR.selPack = 0;
1587  else {unsigned int theHash = (Sel.Opts & XrdCmsSelect::UseAH
1588  ? Sel.AltHash : Sel.Path.Hash);
1589  SMask_t sVec = pmask;
1590  for (count = 0; sVec; count++) sVec &= (sVec - 1);
1591  if (count > 1) selR.selPack = affsel = (theHash % count) + 1;
1592  else selR.selPack = 0;
1593  }
1594 
1595 // There is a difference bwteen needing space and needing r/w access. The former
1596 // is needed when we will be writing data the latter for inode modifications.
1597 //
1598  if (Sel.Opts & XrdCmsSelect::isMeta) selR.needSpace = 0;
1599  else selR.needSpace = (Sel.Opts & XrdCmsSelect::Write
1600  ? XrdCmsNode::allowsRW : 0);
1601 
1602 // Scan for a primary and alternate node (alternates do staging). At this
1603 // point we omit all peer nodes as they are our last resort. Note that Selbyxxx
1604 // returns the node unlocked but we have the global mutex so that is OK.
1605 //
1606  STMutex.ReadLock();
1607  mask = pmask & peerMask;
1608  while(pass--)
1609  {if (mask)
1610  {nP = (Config.sched_RR || (Sel.Opts & XrdCmsSelect::UseRef)
1611  ? SelbyRef(mask,selR)
1612  : Config.sched_LoadR == 0 ? SelbyLoad(pmask,selR)
1613  : SelbyLoadR(pmask, selR));
1614  if (nP || (selR.nPick && selR.delay)
1615  || NodeCnt < Config.SUPCount) break;
1616  }
1617  mask = amask & peerMask; isalt = XrdCmsNode::allowsSS;
1618  if (!(Sel.Opts & XrdCmsSelect::isMeta)) selR.needSpace |= isalt;
1619  }
1620 
1621 // Produce affinity result trace
1622 //
1623  if (Sel.Opts & XrdCmsSelect::Pack && nP)
1624  {TRACE(Redirect, "affinity " <<affsel <<'/' <<count <<'/'
1625  <<(int)selR.selPack <<(selR.selPack ? " go " : " ng ")
1626  <<nP->Name() <<' ' <<Sel.Path.Val);
1627  }
1628 
1629 // If we found an eligible node then dispatch the client to it. We will
1630 // swap the global mutex for the node mutex to minimize interefrence.
1631 //
1632  if (nP)
1633  {nP->g2nLock(STMutex);
1634  Sel.Resp.DLen = nP->netIF.GetPublicName(Sel.Resp.Data, Sel.Resp.Port);
1635  if (!Sel.Resp.DLen) {nP->UnLock(); return Unreachable(Sel, false);}
1636  Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
1637 
1638  // If a message is to be sent to the selected server, send it.
1639  //
1640  if (Sel.iovN && Sel.iovP) nP->Send(Sel.iovP, Sel.iovN);
1641 
1642  // Do special post proccessing when any of:
1643  // a) isalt true: Secondary selection occurred
1644  // b) Create set: File creation will occur
1645  //
1646  if (isalt || (Sel.Opts & XrdCmsSelect::Create))
1648  if (Sel.Opts & XrdCmsSelect::noBind) act = " handling ";
1649  else Cache.AddFile(Sel, nP->NodeMask);
1650  }
1651 
1652  // Determine what we are actually doing here
1653  //
1654  nP->UnLock();
1655  if (!act)
1656  {if (isalt) act = (Sel.iovN ? " staging " : " assigned ");
1657  else act = " serving ";
1658  }
1659  TRACE(Stage, Sel.Resp.Data <<act <<Sel.Path.Val);
1660  return 0;
1661  }
1662 
1663 // No node so check if we have a sufficient number to continue. Note that we
1664 // do not forward to a peer unless we have a suffficient number of local nodes.
1665 //
1666  if (!selR.delay && NodeCnt < Config.SUPCount)
1667  {STMutex.UnLock();
1668  Record(Sel.Path.Val, "insufficient number of nodes", true);
1669  return Config.SUPDelay;
1670  }
1671 
1672 // Return delay if we should avoid selecting a peer manager
1673 //
1674  if (selR.delay && selR.delay < Config.PSDelay)
1675  {STMutex.UnLock();
1676  Record(Sel.Path.Val, selR.reason);
1677  return selR.delay;
1678  }
1679 
1680 // At this point, we attempt a peer node selection (choice of last resort). Note
1681 // that we are still holding the global lock! If we find a peer node we will
1682 // swap it with the node lock.
1683 //
1684  if (Sel.Opts & XrdCmsSelect::Peers)
1685  {const char *reason1 = selR.reason;
1686  int delay1 = selR.delay;
1687  bool noNet = selR.xNoNet;
1688  if ((mask = (pmask | amask) & peerHost)) nP = SelbyCost(mask, selR);
1689  if (nP)
1690  {nP->g2nLock(STMutex);
1691  Sel.Resp.DLen = nP->netIF.GetPublicName(Sel.Resp.Data,Sel.Resp.Port);
1692  if (!Sel.Resp.DLen) {nP->UnLock(); return Unreachable(Sel, false);}
1693  Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
1694  if (Sel.iovN && Sel.iovP) nP->Send(Sel.iovP, Sel.iovN);
1695  nP->UnLock();
1696  TRACE(Stage, "Peer " <<Sel.Resp.Data <<" handling " <<Sel.Path.Val);
1697  return 0;
1698  }
1699  if (!selR.delay)
1700  {selR.delay = delay1; selR.reason = reason1; selR.xNoNet = noNet;}
1701  }
1702 
1703 // At this point we don't need the global lock so let it go.
1704 //
1705  STMutex.UnLock();
1706 
1707 // At this point we either don't have enough nodes or simply can't handle this
1708 //
1709  if (selR.delay)
1710  {Record(Sel.Path.Val, selR.reason);
1711  return selR.delay;
1712  }
1713 
1714 // Return appropriate error message
1715 //
1716  if (selR.xNoNet) return Unreachable(Sel, true);
1717  return Unuseable(Sel);
1718 }
1719 
1720 /******************************************************************************/
1721 /* R e f C o u n t */
1722 /******************************************************************************/
1723 
1724 // This snippet of code occurrs often enough so that we make it a macro as we
1725 // want to execute this inline.
1726 //
1727 #define RefCount(sP, sPMulti, NeedSpace) \
1728  if (NeedSpace) {sP->RefTotW++; sP->RefW++;} \
1729  else {sP->RefTotR++; sP->RefR++;} \
1730  if (sPMulti && sP->Share && !sP->Shrem--) \
1731  {sP->RefW += sP->Shrip; sP->RefR += sP->Shrip; \
1732  sP->Shrem = sP->Share; sP->Shrin++; \
1733  }
1734 
1735 /******************************************************************************/
1736 /* S e l b y C o s t */
1737 /******************************************************************************/
1738 
1739 // Cost selection is used only for peer node selection as peers do not
1740 // report a load and handle their own scheduling.
1741 
1742 // Caller must have the STMutex locked. The returned node, if any, is unlocked.
1743 
1744 XrdCmsNode *XrdCmsCluster::SelbyCost(SMask_t mask, XrdCmsSelector &selR)
1745 {
1746  XrdCmsNode *np, *sp = 0;
1747  bool Multi = false;
1748 
1749 // Scan for a node (sp points to the selected one)
1750 //
1751  selR.Reset(); SelTcnt++;
1752  for (int i = 0; i <= STHi; i++)
1753  if ((np = NodeTab[i]) && (np->NodeMask & mask))
1754  {if (!(selR.needNet & np->hasNet)) {selR.xNoNet= true; continue;}
1755  selR.nPick++;
1756  if (np->isOffline) {selR.xOff = true; continue;}
1757  if (np->isBad) {selR.xSusp = true; continue;}
1758  if (selR.needSpace && np->isNoStage) {selR.xFull = true; continue;}
1759  if (!sp) sp = np;
1760  else{if (abs(sp->myCost - np->myCost) <= Config.P_fuzz)
1761  { if (selR.selPack)
1762  {if (--selR.selPack) sp=np;
1763  else break;
1764  }
1765  else if (selR.needSpace)
1766  {if (sp->RefW > (np->RefW+Config.DiskLinger))
1767  sp=np;
1768  }
1769  else if (sp->RefR > np->RefR) sp=np;
1770  }
1771  else if (sp->myCost > np->myCost) sp=np;
1772  Multi = true;
1773  }
1774  }
1775 
1776 // Check for overloaded node and return result
1777 //
1778  if (!sp) return calcDelay(selR);
1779  RefCount(sp, Multi, selR.needSpace);
1780  return sp;
1781 }
1782 
1783 /******************************************************************************/
1784 /* S e l b y L o a d */
1785 /******************************************************************************/
1786 
1787 // Caller must have the STMutex locked. The returned node, if any, is unlocked.
1788 
1789 XrdCmsNode *XrdCmsCluster::SelbyLoad(SMask_t mask, XrdCmsSelector &selR)
1790 {
1791  XrdCmsNode *np, *sp = 0;
1792  bool Multi = false, reqSS = (selR.needSpace & XrdCmsNode::allowsSS) != 0;
1793 
1794 // Scan for a node (preset possible, suspended, overloaded, full, and dead)
1795 //
1796  selR.Reset(); SelTcnt++;
1797  for (int i = 0; i <= STHi; i++)
1798  if ((np = NodeTab[i]) && (np->NodeMask & mask))
1799  {if (!(selR.needNet & np->hasNet)) {selR.xNoNet= true; continue;}
1800  selR.nPick++;
1801  if (np->isOffline) {selR.xOff = true; continue;}
1802  if (np->isBad) {selR.xSusp = true; continue;}
1803  if (np->myLoad > Config.MaxLoad) {selR.xOvld = true; continue;}
1804  if (selR.needSpace && (np->DiskFree < np->DiskMinF
1805  || (reqSS && np->isNoStage)))
1806  {selR.xFull = true; continue;}
1807  if (!sp) sp = np;
1808  else{if (selR.needSpace)
1809  {if (abs(sp->myMass - np->myMass) <= Config.P_fuzz)
1810  {if (sp->RefW > (np->RefW+Config.DiskLinger)) sp=np;}
1811  else if (sp->myMass > np->myMass) sp=np;
1812  } else {
1813  if (abs(sp->myLoad - np->myLoad) <= Config.P_fuzz)
1814  {if (selR.selPack)
1815  {if (--selR.selPack) sp=np;
1816  else break;
1817  }
1818  else if (sp->RefR > np->RefR) sp=np;
1819  }
1820  else if (sp->myLoad > np->myLoad) sp=np;
1821  }
1822  Multi = true;
1823  }
1824  }
1825 
1826 // Check for overloaded node and return result
1827 //
1828  if (!sp) return calcDelay(selR);
1829  RefCount(sp, Multi, selR.needSpace);
1830  return sp;
1831 }
1832 
1833 /******************************************************************************/
1834 /* S e l b y L o a d R */
1835 /******************************************************************************/
1836 
1837 // Caller must have the STMutex locked. The returned node, if any, is unlocked.
1838 
1839 XrdCmsNode *XrdCmsCluster::SelbyLoadR(SMask_t mask, XrdCmsSelector &selR)
1840 {
1841  static std::random_device rand_dev;
1842  static std::default_random_engine generator(rand_dev());
1843 
1844  XrdCmsNode *np = nullptr, *sp = nullptr;
1845  bool reqSS = (selR.needSpace & XrdCmsNode::allowsSS) != 0;
1846 
1847  // Scan for a node (preset possible, suspended, overloaded, full, and dead)
1848 
1849  selR.Reset();
1850  SelTcnt++;
1851 
1852  int totWeight = 0;
1853 
1854  for (int i = 0; i <= STHi; ++i) {
1855  NodeWeight[i] = 0; // make node unselectable first
1856 
1857  if (!((np = NodeTab[i]) && (np->NodeMask & mask)))
1858  continue;
1859 
1860  if (!(selR.needNet & np->hasNet)) { selR.xNoNet = true; continue; }
1861 
1862  selR.nPick++;
1863 
1864  if (np->isOffline) { selR.xOff = true; continue; }
1865  if (np->isBad) { selR.xSusp = true; continue; }
1866  if (np->myLoad > Config.MaxLoad) { selR.xOvld = true; continue; }
1867 
1868  if (selR.needSpace) {
1869  if (np->DiskFree < np->DiskMinF || (reqSS && np->isNoStage)) {
1870  selR.xFull = true;
1871  continue;
1872  }
1873  }
1874 
1875  // If node passes filters, give it a weight
1876  totWeight += Config.P_fuzz + (100 - np->myLoad);
1877  NodeWeight[i] = totWeight;
1878  }
1879 
1880  std::uniform_int_distribution<int> distr(1, totWeight);
1881  int selected = distr(generator);
1882 
1883  for (int i = 0; i <= STHi; ++i) {
1884  if (NodeWeight[i] < selected)
1885  continue;
1886 
1887  sp = NodeTab[i];
1888  break;
1889  }
1890 
1891  return sp ? sp : calcDelay(selR);
1892 }
1893 
1894 /******************************************************************************/
1895 /* S e l b y R e f */
1896 /******************************************************************************/
1897 
1898 // Caller must have the STMutex locked. The returned node, if any, is unlocked.
1899 
1900 XrdCmsNode *XrdCmsCluster::SelbyRef(SMask_t mask, XrdCmsSelector &selR)
1901 {
1902  XrdCmsNode *np, *sp = 0;
1903  bool Multi = false, reqSS = (selR.needSpace & XrdCmsNode::allowsSS) != 0;
1904 
1905 // Scan for a node (sp points to the selected one)
1906 //
1907  selR.Reset(); SelTcnt++;
1908  for (int i = 0; i <= STHi; i++)
1909  if ((np = NodeTab[i]) && (np->NodeMask & mask))
1910  {if (!(selR.needNet & np->hasNet)) {selR.xNoNet= true; continue;}
1911  selR.nPick++;
1912  if (np->isOffline) {selR.xOff = true; continue;}
1913  if (np->isBad) {selR.xSusp = true; continue;}
1914  if (selR.needSpace && (np->DiskFree < np->DiskMinF
1915  || (reqSS && np->isNoStage)))
1916  {selR.xFull = true; continue;}
1917  if (!sp) sp = np;
1918  else {Multi = true;
1919  if (selR.selPack)
1920  {if (--selR.selPack) sp=np;
1921  else break;
1922  }
1923  else if (selR.needSpace)
1924  {if (sp->RefW > (np->RefW+Config.DiskLinger)) sp=np;}
1925  else if (sp->RefR > np->RefR) sp=np;
1926  }
1927  }
1928 
1929 // Check for overloaded node and return result
1930 //
1931  if (!sp) return calcDelay(selR);
1932  RefCount(sp, Multi, selR.needSpace);
1933  return sp;
1934 }
1935 
1936 /******************************************************************************/
1937 /* S e l D F S */
1938 /******************************************************************************/
1939 
1940 int XrdCmsCluster::SelDFS(XrdCmsSelect &Sel, SMask_t amask,
1941  SMask_t &pmask, SMask_t &smask, int isRW)
1942 {
1943  EPNAME("SelDFS");
1944  static const SMask_t allNodes(~0);
1945  int oldOpts, rc;
1946 
1947 // The first task is to find out if the file exists somewhere. If we are doing
1948 // local queries, then the answer will be immediate. Otherwise, forward it.
1949 //
1950  if ((Sel.Opts & XrdCmsSelect::Refresh) || !(rc = Cache.GetFile(Sel, amask)))
1951  {if (!baseFS.Local())
1952  {CmsStateRequest QReq = {{Sel.Path.Hash, kYR_state, kYR_raw, 0}};
1953  TRACE(Files, "seeking " <<Sel.Path.Val);
1954  Cache.AddFile(Sel, 0);
1955  if (Sel.Opts & XrdCmsSelect::Refresh)
1957  Cluster.Broadsend(amask, QReq.Hdr, Sel.Path.Val, Sel.Path.Len+1);
1958  return 0;
1959  }
1960  if ((rc = baseFS.Exists(Sel.Path.Val, -Sel.Path.Len)) < 0)
1961  {Cache.AddFile(Sel, 0);
1962  Sel.Vec.bf = Sel.Vec.pf = Sel.Vec.wf = Sel.Vec.hf = 0;
1963  } else {
1964  Sel.Vec.hf = amask; Sel.Vec.wf = (isRW ? amask : 0);
1965  oldOpts = Sel.Opts;
1966  if (rc != CmsHaveRequest::Pending) Sel.Vec.pf = 0;
1967  else {Sel.Vec.pf = amask; Sel.Opts |= XrdCmsSelect::Pending;}
1968  Cache.AddFile(Sel, allNodes);
1969  Sel.Opts = oldOpts;
1970  }
1971  }
1972 
1973 // Screen out online requests where the file is pending
1974 //
1975  if (Sel.Opts & XrdCmsSelect::Online && Sel.Vec.pf)
1976  {pmask = smask = 0;
1977  return 1;
1978  }
1979 
1980 // If the file is to be written and the files exists then it can't be a new file
1981 //
1982  if (isRW && Sel.Vec.hf)
1983  {if (Sel.Opts & XrdCmsSelect::NewFile) return SelFail(Sel,eExists);
1984  if (Sel.Opts & XrdCmsSelect::Trunc) smask = 0;
1985  return 1;
1986  }
1987 
1988 // Final verification that we have something to select
1989 //
1990  if (!Sel.Vec.hf
1991  && (!isRW || !(Sel.Opts & (XrdCmsSelect::Trunc | XrdCmsSelect::NewFile))))
1992  return SelFail(Sel, eNoEnt);
1993  return 1;
1994 }
1995 
1996 /******************************************************************************/
1997 /* s e n d A L i s t */
1998 /******************************************************************************/
1999 
2000 // Single entry at a time, protected by STMutex in write mode!
2001 
2002 void XrdCmsCluster::sendAList(XrdLink *lp)
2003 {
2004  static CmsTryRequest Req = {{0, kYR_try, 0, 0}, 0};
2005  static int HdrSize = sizeof(Req.Hdr) + sizeof(Req.sLen);
2006  static char *AltNext = AltMans;
2007  static struct iovec iov[4] = {{(caddr_t)&Req, (size_t)HdrSize},
2008  {0, 0},
2009  {AltMans, 0},
2010  {(caddr_t)"\0", 1}};
2011  int dlen;
2012 
2013 // Calculate what to send
2014 //
2015  AltNext = AltNext + AltSize;
2016  if (AltNext >= AltMend)
2017  {AltNext = AltMans;
2018  iov[1].iov_len = 0;
2019  iov[2].iov_len = dlen = AltMend - AltMans;
2020  } else {
2021  iov[1].iov_base = (caddr_t)AltNext;
2022  iov[1].iov_len = AltMend - AltNext;
2023  iov[2].iov_len = AltNext - AltMans;
2024  dlen = iov[1].iov_len + iov[2].iov_len;
2025  }
2026 
2027 // Complete the request (account for trailing null character)
2028 //
2029  dlen++;
2030  Req.Hdr.datalen = htons(static_cast<unsigned short>(dlen+sizeof(Req.sLen)));
2031  Req.sLen = htons(static_cast<unsigned short>(dlen));
2032 
2033 // Send the list of alternates (rotated once)
2034 //
2035  lp->Send(iov, 4, dlen+HdrSize);
2036 }
2037 
2038 /******************************************************************************/
2039 /* s e t A l t M a n */
2040 /******************************************************************************/
2041 
2042 // Single entry at a time, protected by STMutex in write mode!
2043 
2044 void XrdCmsCluster::setAltMan(int snum, XrdLink *lp, int port)
2045 {
2046  XrdNetAddr altAddr = *(lp->NetAddr());
2047  char *ap = &AltMans[snum*AltSize];
2048  int i;
2049 
2050 // Preset the buffer and pre-screen the port number
2051 //
2052  if (!port || (port > 0x0000ffff)) port = Config.PortTCP;
2053  memset(ap, int(' '), AltSize);
2054 
2055 // First tr to use the hostname:port which may be too large (unlikely). Else
2056 // Insert the ip address of this node into the list of nodes. We made sure that
2057 // the size of he buffer was big enough so no need to check for overflow.
2058 //
2059  altAddr.Port(port);
2060  if (Config.DoHnTry) i = altAddr.Format(ap, AltSize, XrdNetAddr::fmtName);
2061  else i = 0;
2062  if (!i) i=altAddr.Format(ap,AltSize,XrdNetAddr::fmtAddr,XrdNetAddr::prefipv4);
2063  ap[i] = ' ';
2064 
2065 // Compute new fence
2066 //
2067  if (ap >= AltMend) {AltMend = ap + AltSize; AltMent = snum;}
2068 }
2069 
2070 /******************************************************************************/
2071 /* U n r e a c h a b l e */
2072 /******************************************************************************/
2073 
2074 int XrdCmsCluster::Unreachable(XrdCmsSelect &Sel, bool none)
2075 {
2077  const char *Amode = (Sel.Opts & XrdCmsSelect::Write ? "write" : "read");
2078  const char *Xmode = (Sel.Opts & XrdCmsSelect::Online ? "immediately " : "");
2079 
2080  if (none)
2081  {Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
2082  "No servers are reachable via %s network to %s%s the file.",
2083  XrdNetIF::Name(nType), Xmode, Amode) + 1;
2084  } else {
2085  Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
2086  "Eligible server is unreachable via %s network to %s%s the file.",
2087  XrdNetIF::Name(nType), Xmode, Amode) + 1;
2088  }
2089  Sel.Resp.Port = kYR_ENETUNREACH;
2090  return EReplete;
2091 }
2092 
2093 /******************************************************************************/
2094 /* U n u s e a b l e */
2095 /******************************************************************************/
2096 
2097 int XrdCmsCluster::Unuseable(XrdCmsSelect &Sel)
2098 {
2099  const char *Amode = (Sel.Opts & XrdCmsSelect::Write ? "write" : "read");
2100  const char *Xmode = (Sel.Opts & XrdCmsSelect::Online ? "immediately " : "");
2101  const char *EType = (Sel.Opts & XrdCmsSelect::isDir ? "directory" : "file");
2102 
2103  int n = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data),
2104  "No servers are available to %s%s the %s.",
2105  Xmode, Amode, EType);
2106  if (n < (int)sizeof(Sel.Resp.Data)) Sel.Resp.DLen = n+1;
2107  else Sel.Resp.DLen = sizeof(Sel.Resp.Data);
2108 
2109  Sel.Resp.Port = kYR_ENOENT;
2110  return EReplete;
2111 }
void Usage(const char *msg)
Definition: XrdAccTest.cc:105
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
#define RefCount(sP, sPMulti, NeedSpace)
#define QTRACE(act)
Definition: XrdCmsTrace.hh:49
#define STMax
Definition: XrdCmsTypes.hh:39
unsigned long long SMask_t
Definition: XrdCmsTypes.hh:33
#define FULLMASK
Definition: XrdCmsTypes.hh:35
int stat(const char *path, struct stat *buf)
bool Debug
bool Exists
XrdOucString Path
struct myOpts opts
if(Avsz)
size_t strlcpy(char *dst, const char *src, size_t sz)
#define TRACE(act, x)
Definition: XrdTrace.hh:63
int Exists(XrdCmsRRData &Arg, XrdCmsPInfo &Who, int noLim=0)
static int Present(const char *hName, XrdOucTList *bList=0, char *rbuff=0, int rblen=0)
int GetFile(XrdCmsSelect &Sel, SMask_t mask)
Definition: XrdCmsCache.cc:232
int AddFile(XrdCmsSelect &Sel, SMask_t mask)
Definition: XrdCmsCache.cc:117
XrdCmsPList_Anchor Paths
Definition: XrdCmsCache.hh:49
int UnkFile(XrdCmsSelect &Sel, SMask_t mask)
Definition: XrdCmsCache.cc:278
void Drop(SMask_t mask, int SNum, int xHi)
Definition: XrdCmsCache.cc:359
int WT4File(XrdCmsSelect &Sel, SMask_t mask)
Definition: XrdCmsCache.cc:306
static XrdCmsClustID * AddID(const char *cID)
static SMask_t Mask(const char *cID)
XrdCmsNode * RemNode(XrdCmsNode *nP)
static XrdCmsClustID * Find(const char *cID)
bool AddNode(XrdCmsNode *nP, bool isMan)
SMask_t getMask(const XrdNetAddr *addr)
void Space(XrdCms::SpaceData &sData, SMask_t smask)
int Broadsend(SMask_t smask, XrdCms::CmsRRHdr &Hdr, void *Data, int Dlen)
void * MonPerf()
int Select(XrdCmsSelect &Sel)
int Locate(XrdCmsSelect &Sel)
void ResetRef(SMask_t smask, bool isLocked=false)
SMask_t Broadcast(SMask_t, const struct iovec *, int, int tot=0)
void * MonRefs()
XrdCmsSelected * List(SMask_t mask, CmsLSOpts opts, bool &oksel)
XrdCmsNode * Add(XrdLink *lp, int dport, int Status, int sport, const char *theNID, const char *theIF)
void Remove(XrdCmsNode *theNode)
int Stats(char *bfr, int bln)
virtual void BlackList(XrdOucTList *blP)
int Statt(char *bfr, int bln)
static const int RepStat_shr
static const int RepStat_frq
char myRType[4]
XrdCmsDrop(int nid, int inst)
XrdCmsNode * nodeP
XrdCmsDrop(XrdCmsNode *nP)
unsigned int Hash
Definition: XrdCmsKey.hh:53
char * Val
Definition: XrdCmsKey.hh:52
short Len
Definition: XrdCmsKey.hh:54
char isPerm
Definition: XrdCmsNode.hh:73
int DiskMinF
Definition: XrdCmsNode.hh:89
int DiskFree
Definition: XrdCmsNode.hh:90
static const char allowsRW
Definition: XrdCmsNode.hh:84
char * Ident
Definition: XrdCmsNode.hh:61
void Ref()
Definition: XrdCmsNode.hh:179
char isConn
Definition: XrdCmsNode.hh:71
void Delete(XrdSysRWLock &gMutex)
Definition: XrdCmsNode.hh:129
void n2gLock(XrdSysRWLock &gMutex, bool rdlock=false)
Definition: XrdCmsNode.hh:168
int Send(const char *buff, int blen=0)
Definition: XrdCmsNode.hh:184
static const char allowsSS
Definition: XrdCmsNode.hh:85
char isGone
Definition: XrdCmsNode.hh:72
int DiskUtil
Definition: XrdCmsNode.hh:91
void unRef()
Definition: XrdCmsNode.hh:180
char isPeer
Definition: XrdCmsNode.hh:68
void Lock()
Definition: XrdCmsNode.hh:175
static const char isDisabled
Definition: XrdCmsNode.hh:80
char RoleID
Definition: XrdCmsNode.hh:75
int isNode(SMask_t smask)
Definition: XrdCmsNode.hh:145
SMask_t Mask()
Definition: XrdCmsNode.hh:160
char isBad
Definition: XrdCmsNode.hh:63
char isOffline
Definition: XrdCmsNode.hh:64
void g2nLock(XrdSysRWLock &gMutex)
Definition: XrdCmsNode.hh:162
static const char isSuspend
Definition: XrdCmsNode.hh:81
unsigned int DiskTotal
Definition: XrdCmsNode.hh:87
int ID(int &INum)
Definition: XrdCmsNode.hh:139
char isNoStage
Definition: XrdCmsNode.hh:66
void Disc(const char *reason=0, int needLock=1)
Definition: XrdCmsNode.cc:256
char isMan
Definition: XrdCmsNode.hh:67
void UnLock()
Definition: XrdCmsNode.hh:177
void setName(XrdLink *lnkp, const char *theIF, int port)
Definition: XrdCmsNode.cc:145
char isBound
Definition: XrdCmsNode.hh:69
static const char isDoomed
Definition: XrdCmsNode.hh:82
static const char isBlisted
Definition: XrdCmsNode.hh:79
char * Name()
Definition: XrdCmsNode.hh:158
char hasNet
Definition: XrdCmsNode.hh:62
SMask_t ssvec
Definition: XrdCmsPList.hh:49
SMask_t rovec
Definition: XrdCmsPList.hh:47
SMask_t rwvec
Definition: XrdCmsPList.hh:48
int Find(const char *pname, XrdCmsPInfo &masks)
Definition: XrdCmsPList.cc:77
SMask_t rwVec
Definition: XrdCmsRRQ.hh:59
void Statistics(Info &Data)
Definition: XrdCmsRRQ.hh:144
static const char * Type(RoleID rid)
Definition: XrdCmsRole.hh:78
struct XrdCmsSelect::@93 Resp
struct XrdCmsSelect::@92 Vec
XrdCmsRRQInfo * InfoP
Definition: XrdCmsSelect.hh:47
XrdCmsKey Path
Definition: XrdCmsSelect.hh:46
SMask_t nmask
Definition: XrdCmsSelect.hh:48
static const int IdentSize
char Ident[IdentSize]
XrdCmsSelected * next
const char * reason
void Update(StateType StateT, int ActivVal, int StageVal=0)
Definition: XrdCmsState.cc:258
void Set(int ncount)
Definition: XrdCmsState.cc:182
Definition: XrdJob.hh:43
static const int prefipv4
Use if mapped IPV4 actual format.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
@ fmtName
Hostname if it is resolvable o/w use fmtAddr.
int Port(int pNum=-1)
Definition: XrdNetAddr.cc:148
char Mask()
Definition: XrdNetIF.hh:288
int GetPublicName(char *nbuff, int &nport)
Definition: XrdNetIF.hh:150
int Port()
Definition: XrdNetIF.hh:322
bool HasDest(ifType ifT=PublicV6)
Definition: XrdNetIF.hh:267
static const char * Name(ifType ifT)
Definition: XrdNetIF.hh:312
int GetPublicDest(char *dest, size_t dlen)
Definition: XrdNetIF.cc:412
ifType
The enum that is used to index into ifData to get appropriate interface.
Definition: XrdNetIF.hh:65
@ PrivateIF
Definition: XrdNetIF.hh:69
static int Pack(struct iovec **, const char *, unsigned short &buff)
Definition: XrdOucPup.cc:52
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static void Snooze(int seconds)
Definition: XrdSysTimer.cc:168
static struct XrdCl::None none
ZipListImpl< false > List(Ctx< ZipArchive > zip)
Factory for creating ZipStatImpl objects.
XrdCmsRRQ RRQ
Definition: XrdCmsRRQ.cc:55
XrdCmsCache Cache
Definition: XrdCmsCache.cc:54
static const unsigned char kYR_Version
Definition: YProtocol.hh:80
kXR_unt16 datalen
Definition: YProtocol.hh:86
@ kYR_ENETUNREACH
Definition: YProtocol.hh:158
@ kYR_noReplicas
Definition: YProtocol.hh:164
@ kYR_ENOENT
Definition: YProtocol.hh:150
@ kYR_RWConflict
Definition: YProtocol.hh:163
static const int CMS_isSuper
static const int CMS_noStage
kXR_char modifier
Definition: YProtocol.hh:85
XrdScheduler * Sched
XrdCmsCluster Cluster
XrdCmsBaseFS baseFS
XrdSysError Say
XrdCmsState CmsState
Definition: XrdCmsState.cc:55
static const int CMS_isMan
XrdCmsConfig Config
@ kYR_raw
Definition: YProtocol.hh:132
@ kYR_disc
Definition: YProtocol.hh:103
@ kYR_try
Definition: YProtocol.hh:114
@ kYR_state
Definition: YProtocol.hh:110
@ kYR_usage
Definition: YProtocol.hh:116
static const int CMS_isPeer
static const int CMS_Suspend
int Opts
Definition: XrdMpxStats.cc:58
long long luSlow
Definition: XrdCmsRRQ.hh:139
long long rdSlow
Definition: XrdCmsRRQ.hh:141
long long Resp
Definition: XrdCmsRRQ.hh:136
long long luFast
Definition: XrdCmsRRQ.hh:138
long long Add2Q
Definition: XrdCmsRRQ.hh:134
long long Multi
Definition: XrdCmsRRQ.hh:137
long long rdFast
Definition: XrdCmsRRQ.hh:140
long long PBack
Definition: XrdCmsRRQ.hh:135