34 #include <sys/resource.h>
36 #include <sys/types.h>
39 #include <AvailabilityMacros.h>
48 #define XRD_TRACE XrdTrace->
55 const char *XrdScheduler::TraceID =
"Sched";
98 int minw,
int maxw,
int maxi)
99 :
XrdJob(
"underused thread monitor"),
100 XrdTraceOld(0), WorkAvail(0,
"sched work")
102 Boot(eP, tP, minw, maxw, maxi);
109 int minw,
int maxw,
int maxi)
110 :
XrdJob(
"underused thread monitor"),
111 XrdTraceOld(tP), WorkAvail(0,
"sched work")
124 :
XrdJob(
"underused thread monitor"),
125 XrdTraceOld(0), WorkAvail(0,
"sched work")
132 #if ( defined(__linux__) || defined(__GNU__) ) && defined(F_DUPFD_CLOEXEC)
133 eFD =
fcntl(STDERR_FILENO, F_DUPFD_CLOEXEC, 0);
135 eFD = dup(STDERR_FILENO);
136 fcntl(eFD, F_SETFD, FD_CLOEXEC);
151 Init(minw, maxw, maxi);
159 int minw,
int maxw,
int maxi)
165 Init(minw, maxw, maxi);
196 while(p && p != jp) {pp = p; p = p->
NextJob;}
203 TRACE(SCHED,
"time event " <<jp->
Comment <<
" cancelled");
217 int num_kill, num_idle;
222 {DispatchMutex.
Lock(); num_idle = idl_Workers; DispatchMutex.
UnLock();
223 num_kill = num_idle - min_Workers;
224 TRACE(SCHED, num_Workers <<
" threads; " <<num_idle <<
" idle");
226 {
if (num_kill > 1) num_kill = num_kill/2;
228 num_Layoffs = num_kill;
229 while(num_kill--) WorkAvail.
Post();
236 if (max_Workidl > 0)
Schedule((
XrdJob *)
this, max_Workidl+time(0));
247 static int retc, ReaperStarted = 0;
253 if ((pid = fork()) < 0)
254 {XrdLog->
Emsg(
"Scheduler",errno,
"fork to handle",
id);
257 if (!pid)
return pid;
263 retc = ReaperStarted;
271 0,
"Process reaper")))
272 {XrdLog->
Emsg(
"Scheduler", retc,
"create reaper thread");
288 #if defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_5)
289 struct timespec
ts = { 1, 0 };
297 sigaddset(&Sset, SIGCHLD);
302 do {ReaperMutex.
Lock();
303 tp = firstPID; ptp = 0;
305 {
do {pid = waitpid(tp->
pid, &status, WNOHANG);}
306 while (pid < 0 && errno == EINTR);
309 xtp = tp; tp = tp->
next;
310 if (ptp) ptp->
next = tp;
313 }
else {ptp = tp; tp = tp->
next;}
316 #if defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_5)
318 }
while (nanosleep(&
ts, 0) <= 0);
320 }
while(sigwait(&Sset, &signum) >= 0);
336 do {
do {DispatchMutex.
Lock(); idl_Workers++;DispatchMutex.
UnLock();
338 DispatchMutex.
Lock();waiting = --idl_Workers;DispatchMutex.
UnLock();
340 if ((jp = WorkFirst))
341 {
if (!(WorkFirst = jp->
NextJob)) WorkLast = 0;
342 if (num_JobsinQ) num_JobsinQ--;
343 else XrdLog->
Emsg(
"Scheduler",
"Job queue count underflow!");
350 TRACE(SCHED,
"terminating thread; workers=" <<num_Workers);
362 if (!waiting) hireWorker();
364 {
TRACE(SCHED,
"running " <<jp->
Comment <<
" inq=" <<num_JobsinQ);}
425 num_JobsinQ += numjobs;
430 while(numjobs--) WorkAvail.
Post();
450 {
TRACE(SCHED,
"scheduling " <<jp->
Comment <<
" in " <<atime-time(0) <<
" seconds");}
451 jp->SchedTime = atime;
457 while(p && p->SchedTime <= atime) {pp = p; p = p->
NextJob;}
463 else {TimerQueue = jp; TimerRings.
Signal();}
489 #if ( defined(__linux__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) ) && defined(RLIMIT_NPROC)
497 if ((pdFD =
open(
"/proc/sys/kernel/pid_max", O_RDONLY)) >= 0)
499 if ((rdsz =
read(pdFD, pmBuff,
sizeof(pmBuff))) > 0)
500 {rdsz = atoi(pmBuff);
501 if (rdsz < 16384) theMax = 16384;
503 theMax =
static_cast<rlim_t
>(rdsz-2000);
510 const bool setnp = (getenv(
"XRDLEAVENPROC") == 0);
515 if (setnp && !getrlimit(RLIMIT_NPROC, &rlim))
516 {
if (rlim.rlim_max == RLIM_INFINITY || rlim.rlim_max > theMax)
517 {
if (limlower || (rlim.rlim_cur != RLIM_INFINITY && rlim.rlim_cur < theMax))
518 {rlim.rlim_cur = theMax;
519 setrlimit(RLIMIT_NPROC, &rlim);
522 if (rlim.rlim_cur != rlim.rlim_max)
523 {rlim.rlim_cur = rlim.rlim_max;
524 setrlimit(RLIMIT_NPROC, &rlim);
531 if (!getrlimit(RLIMIT_NPROC, &rlim))
532 {
if (rlim.rlim_cur == RLIM_INFINITY || rlim.rlim_cur > theMax)
533 max_Workers =
static_cast<int>(theMax);
534 else max_Workers =
static_cast<int>(rlim.rlim_cur);
545 static int isSet = 0;
550 if (once && isSet) {SchedMutex.
UnLock();
return;}
555 if (maxw <= 0) maxw = max_Workers;
556 if (minw < 0) minw = min_Workers;
557 if (minw > maxw) minw = maxw;
558 if (avlw < 0) avlw = maxw/4*3;
559 else if (avlw > maxw) avlw = maxw;
565 stk_Workers = maxw - avlw;
566 if (maxi >=0) max_Workidl = maxi;
581 TRACE(SCHED,
"Set min_Workers=" <<min_Workers <<
" max_Workers=" <<max_Workers);
582 TRACE(SCHED,
"Set stk_Workers=" <<stk_Workers <<
" max_Workidl=" <<max_Workidl);
603 XrdLog->
Emsg(
"Scheduler", retc,
"create time scheduler thread");
607 if (max_Workidl > 0)
Schedule((
XrdJob *)
this, (time_t)max_Workidl+time(0));
611 if (!(numw = min_Workers/3)) numw = 2;
612 while(numw--) hireWorker(0);
616 TRACE(SCHED,
"Starting with " <<num_Workers <<
" workers" );
625 int cnt_Jobs, cnt_JobsinQ, xam_QLength, cnt_Workers, cnt_idl;
626 int cnt_TCreate, cnt_TDestroy, cnt_Limited;
627 static char statfmt[] =
"<stats id=\"sched\"><jobs>%d</jobs>"
628 "<inq>%d</inq><maxinq>%d</maxinq>"
629 "<threads>%d</threads><idle>%d</idle>"
630 "<tcr>%d</tcr><tde>%d</tde>"
631 "<tlimr>%d</tlimr></stats>";
635 if (!buff)
return sizeof(statfmt) + 16*8;
639 if (do_sync) DispatchMutex.
Lock();
640 cnt_idl = idl_Workers;
641 if (do_sync) DispatchMutex.
UnLock();
645 if (do_sync) SchedMutex.
Lock();
646 cnt_Workers = num_Workers;
648 cnt_JobsinQ = num_JobsinQ;
653 if (do_sync) SchedMutex.
UnLock();
657 return snprintf(buff, blen, statfmt, cnt_Jobs, cnt_JobsinQ, xam_QLength,
658 cnt_Workers, cnt_idl, cnt_TCreate, cnt_TDestroy,
673 do {TimerMutex.
Lock();
674 if (TimerQueue) wtime = TimerQueue->SchedTime-time(0);
678 TimerRings.
Wait(wtime);
695 void XrdScheduler::hireWorker(
int dotrace)
703 if (num_Workers >= max_Workers)
706 XrdLog->
Emsg(
"Scheduler",
"Thread limit has been reached!");
722 {
XrdLog->
Emsg(
"Scheduler", retc,
"create worker thread");
726 max_Workers = num_Workers;
727 min_Workers = (max_Workers/10 ? max_Workers/10 : 1);
728 stk_Workers = max_Workers/4*3;
730 }
else if (dotrace)
TRACE(SCHED,
"Now have " <<num_Workers <<
" workers" );
737 void XrdScheduler::Init(
int minw,
int maxw,
int maxi)
744 stk_Workers = maxw - (maxw/4*3);
753 WorkFirst = WorkLast = TimerQueue = 0;
760 void XrdScheduler::traceExit(pid_t pid,
int status)
764 if (WIFEXITED(status))
765 {retc = WEXITSTATUS(status);
766 why =
" exited with rc=";
767 }
else if (WIFSIGNALED(status))
768 {retc = WTERMSIG(status);
769 why =
" killed with signal ";
771 why =
" changed state ";
773 TRACE(SCHED,
"Process " <<pid <<why <<retc);
static std::string ts()
timestamp output for logging messages
XrdSysError XrdLog(0, "")
int open(const char *path, int oflag,...)
int fcntl(int fd, int cmd,...)
ssize_t read(int fildes, void *buf, size_t nbyte)
void * XrdStartTSched(void *carg)
void * XrdStartWorking(void *carg)
void * XrdStartReaper(void *carg)
#define XRDSYSTHREAD_BIND
friend class XrdScheduler
XrdSchedulerPID(pid_t newpid, XrdSchedulerPID *prev)
int Stats(char *buff, int blen, int do_sync=0)
void Schedule(XrdJob *jp)
void setParms(int minw, int maxw, int avlt, int maxi, int once=0)
void setNproc(const bool limlower)
pid_t Fork(const char *id)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)