00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdSchedulerCVSID = "$Id: XrdScheduler.cc 34000 2010-06-21 06:49:56Z ganis $";
00014
00015 #include <errno.h>
00016 #include <signal.h>
00017 #include <stdio.h>
00018 #include <sys/types.h>
00019 #include <sys/wait.h>
00020 #ifdef __macos__
00021 #include <AvailabilityMacros.h>
00022 #endif
00023
00024 #include "Xrd/XrdJob.hh"
00025 #include "Xrd/XrdScheduler.hh"
00026 #include "Xrd/XrdTrace.hh"
00027 #include "XrdSys/XrdSysError.hh"
00028
00029
00030
00031
00032
00033 extern XrdSysError XrdLog;
00034
00035 #ifndef NODEBUG
00036 extern XrdOucTrace XrdTrace;
00037 #endif
00038
00039 const char *XrdScheduler::TraceID = "Sched";
00040
00041
00042
00043
00044
00045 class XrdSchedulerPID
00046 {public:
00047 XrdSchedulerPID *next;
00048 pid_t pid;
00049
00050 XrdSchedulerPID(pid_t newpid, XrdSchedulerPID *prev)
00051 {next = prev; pid = newpid;}
00052 ~XrdSchedulerPID() {}
00053 };
00054
00055
00056
00057
00058
00059 void *XrdStartReaper(void *carg)
00060 {XrdScheduler *sp = (XrdScheduler *)carg;
00061 sp->Reaper();
00062 return (void *)0;
00063 }
00064
00065 void *XrdStartTSched(void *carg)
00066 {XrdScheduler *sp = (XrdScheduler *)carg;
00067 sp->TimeSched();
00068 return (void *)0;
00069 }
00070
00071 void *XrdStartWorking(void *carg)
00072 {XrdScheduler *sp = (XrdScheduler *)carg;
00073 sp->Run();
00074 return (void *)0;
00075 }
00076
00077
00078
00079
00080
00081 XrdScheduler::XrdScheduler(int minw, int maxw, int maxi)
00082 : XrdJob("underused thread monitor"),
00083 WorkAvail(0, "sched work")
00084 {
00085 min_Workers = minw;
00086 max_Workers = maxw;
00087 max_Workidl = maxi;
00088 num_Workers = 0;
00089 num_JobsinQ = 0;
00090 stk_Workers = maxw - (maxw/4*3);
00091 idl_Workers = 0;
00092 num_Jobs = 0;
00093 max_QLength = 0;
00094 num_TCreate = 0;
00095 num_TDestroy= 0;
00096 num_Limited = 0;
00097 firstPID = 0;
00098 WorkFirst = WorkLast = TimerQueue = 0;
00099 }
00100
00101
00102
00103
00104
00105 XrdScheduler::~XrdScheduler()
00106 {
00107 }
00108
00109
00110
00111
00112
00113 void XrdScheduler::Cancel(XrdJob *jp)
00114 {
00115 XrdJob *p, *pp = 0;
00116
00117
00118
00119 TimerMutex.Lock();
00120
00121
00122
00123 p = TimerQueue;
00124 while(p && p != jp) {pp = p; p = p->NextJob;}
00125
00126
00127
00128 if (p)
00129 {if (pp) pp->NextJob = p->NextJob;
00130 else TimerQueue = p->NextJob;
00131 TRACE(SCHED, "time event " <<jp->Comment <<" cancelled");
00132 }
00133
00134
00135
00136 TimerMutex.UnLock();
00137 }
00138
00139
00140
00141
00142
00143 void XrdScheduler::DoIt()
00144 {
00145 int num_kill, num_idle;
00146
00147
00148
00149 if (!num_JobsinQ)
00150 {DispatchMutex.Lock(); num_idle = idl_Workers; DispatchMutex.UnLock();
00151 num_kill = num_idle - min_Workers;
00152 TRACE(SCHED, num_Workers <<" threads; " <<num_idle <<" idle");
00153 if (num_kill > 0)
00154 {if (num_kill > 1) num_kill = num_kill/2;
00155 SchedMutex.Lock();
00156 num_Layoffs = num_kill;
00157 while(num_kill--) WorkAvail.Post();
00158 SchedMutex.UnLock();
00159 }
00160 }
00161
00162
00163
00164 if (max_Workidl > 0) Schedule((XrdJob *)this, max_Workidl+time(0));
00165 }
00166
00167
00168
00169
00170
00171
00172
00173 pid_t XrdScheduler::Fork(const char *id)
00174 {
00175 static int retc, ReaperStarted = 0;
00176 pthread_t tid;
00177 pid_t pid;
00178
00179
00180
00181 if ((pid = fork()) < 0)
00182 {XrdLog.Emsg("Scheduler",errno,"fork to handle",id);
00183 return pid;
00184 }
00185 if (!pid) return pid;
00186
00187
00188
00189 ReaperMutex.Lock();
00190 firstPID = new XrdSchedulerPID(pid, firstPID);
00191 retc = ReaperStarted;
00192 ReaperStarted = 1;
00193 ReaperMutex.UnLock();
00194
00195
00196
00197 if (!retc)
00198 if ((retc = XrdSysThread::Run(&tid, XrdStartReaper, (void *)this,
00199 0, "Process reaper")))
00200 {XrdLog.Emsg("Scheduler", retc, "create reaper thread");
00201 ReaperStarted = 0;
00202 }
00203
00204 return pid;
00205 }
00206
00207
00208
00209
00210
00211 void *XrdScheduler::Reaper()
00212 {
00213 int status;
00214 pid_t pid;
00215 XrdSchedulerPID *tp, *ptp, *xtp;
00216 #if defined(__macos__) && !defined(MAC_OS_X_VERSION_10_5)
00217 struct timespec ts = { 1, 0 };
00218 #else
00219 sigset_t Sset;
00220 int signum;
00221
00222
00223
00224 sigemptyset(&Sset);
00225 sigaddset(&Sset, SIGCHLD);
00226 #endif
00227
00228
00229
00230 do {ReaperMutex.Lock();
00231 tp = firstPID; ptp = 0;
00232 while(tp)
00233 {do {pid = waitpid(tp->pid, &status, WNOHANG);}
00234 while (pid < 0 && errno == EINTR);
00235 if (pid > 0)
00236 {if (TRACING(TRACE_SCHED)) traceExit(pid, status);
00237 xtp = tp; tp = tp->next;
00238 if (ptp) ptp->next = tp;
00239 else firstPID = tp;
00240 delete xtp;
00241 } else {ptp = tp; tp = tp->next;}
00242 }
00243 ReaperMutex.UnLock();
00244 #if defined(__macos__) && !defined(MAC_OS_X_VERSION_10_5)
00245
00246 } while (nanosleep(&ts, 0) <= 0);
00247 #else
00248 } while(sigwait(&Sset, &signum) >= 0);
00249 #endif
00250 return (void *)0;
00251 }
00252
00253
00254
00255
00256
00257 void XrdScheduler::Run()
00258 {
00259 int waiting;
00260 XrdJob *jp;
00261
00262
00263
00264 do {do {DispatchMutex.Lock(); idl_Workers++;DispatchMutex.UnLock();
00265 WorkAvail.Wait();
00266 DispatchMutex.Lock();waiting = --idl_Workers;DispatchMutex.UnLock();
00267 SchedMutex.Lock();
00268 if ((jp = WorkFirst))
00269 {if (!(WorkFirst = jp->NextJob)) WorkLast = 0;
00270 if (num_JobsinQ) num_JobsinQ--;
00271 else XrdLog.Emsg("Scheduler","Job queue count underflow!");
00272 } else {
00273 num_JobsinQ = 0;
00274 if (num_Layoffs > 0)
00275 {num_Layoffs--;
00276 if (waiting)
00277 {num_TDestroy++; num_Workers--;
00278 TRACE(SCHED, "terminating thread; workers=" <<num_Workers);
00279 SchedMutex.UnLock();
00280 return;
00281 }
00282 }
00283 }
00284 SchedMutex.UnLock();
00285 } while(!jp);
00286
00287
00288
00289
00290 if (!waiting) hireWorker();
00291 TRACE(SCHED, "running " <<jp->Comment <<" inq=" <<num_JobsinQ);
00292 jp->DoIt();
00293 } while(1);
00294 }
00295
00296
00297
00298
00299
00300 void XrdScheduler::Schedule(XrdJob *jp)
00301 {
00302
00303
00304 SchedMutex.Lock();
00305
00306
00307
00308 jp->NextJob = 0;
00309 if (WorkFirst)
00310 {WorkLast->NextJob = jp;
00311 WorkLast = jp;
00312 } else {
00313 WorkFirst = jp;
00314 WorkLast = jp;
00315 }
00316 WorkAvail.Post();
00317
00318
00319
00320 num_Jobs++;
00321 num_JobsinQ++;
00322 if (num_JobsinQ > max_QLength) max_QLength = num_JobsinQ;
00323
00324
00325
00326 SchedMutex.UnLock();
00327 }
00328
00329
00330
00331 void XrdScheduler::Schedule(int numjobs, XrdJob *jfirst, XrdJob *jlast)
00332 {
00333
00334
00335
00336 SchedMutex.Lock();
00337
00338
00339
00340 jlast->NextJob = 0;
00341 if (WorkFirst)
00342 {WorkLast->NextJob = jfirst;
00343 WorkLast = jlast;
00344 } else {
00345 WorkFirst = jfirst;
00346 WorkLast = jlast;
00347 }
00348
00349
00350
00351 num_Jobs += numjobs;
00352 num_JobsinQ += numjobs;
00353 if (num_JobsinQ > max_QLength) max_QLength = num_JobsinQ;
00354
00355
00356
00357 while(numjobs--) WorkAvail.Post();
00358
00359
00360
00361 SchedMutex.UnLock();
00362 }
00363
00364
00365
00366 void XrdScheduler::Schedule(XrdJob *jp, time_t atime)
00367 {
00368 XrdJob *pp = 0, *p;
00369
00370
00371
00372 Cancel(jp);
00373
00374
00375
00376 TRACE(SCHED, "scheduling " <<jp->Comment <<" in " <<atime-time(0) <<" seconds");
00377 jp->SchedTime = atime;
00378 TimerMutex.Lock();
00379
00380
00381
00382 p = TimerQueue;
00383 while(p && p->SchedTime <= atime) {pp = p; p = p->NextJob;}
00384
00385
00386
00387 jp->NextJob = p;
00388 if (pp) pp->NextJob = jp;
00389 else {TimerQueue = jp; TimerRings.Signal();}
00390
00391
00392
00393 TimerMutex.UnLock();
00394 }
00395
00396
00397
00398
00399
00400 void XrdScheduler::setParms(int minw, int maxw, int avlw, int maxi, int once)
00401 {
00402 static int isSet = 0;
00403
00404
00405
00406 SchedMutex.Lock();
00407 if (once && isSet) {SchedMutex.UnLock(); return;}
00408 isSet = 1;
00409
00410
00411
00412 if (maxw <= 0) maxw = max_Workers;
00413 if (minw < 0) minw = (maxw/10 ? maxw/10 : 1);
00414 else if (minw > maxw) minw = maxw;
00415 if (avlw < 0) avlw = maxw/4*3;
00416 else if (avlw > maxw) avlw = maxw;
00417
00418
00419
00420 min_Workers = minw;
00421 max_Workers = maxw;
00422 stk_Workers = maxw - avlw;
00423 if (maxi >=0) max_Workidl = maxi;
00424
00425
00426
00427 SchedMutex.UnLock();
00428
00429
00430
00431 if (maxi > 0)
00432 {Cancel((XrdJob *)this);
00433 Schedule((XrdJob *)this, (time_t)maxi+time(0));
00434 }
00435
00436
00437
00438 TRACE(SCHED,"Set min_Workers=" <<min_Workers <<" max_Workers=" <<max_Workers);
00439 TRACE(SCHED,"Set stk_Workers=" <<stk_Workers <<" max_Workidl=" <<max_Workidl);
00440 }
00441
00442
00443
00444
00445
00446 void XrdScheduler::Start()
00447 {
00448 int retc, numw;
00449 pthread_t tid;
00450
00451
00452
00453 if ((retc = XrdSysThread::Run(&tid, XrdStartTSched, (void *)this,
00454 XRDSYSTHREAD_BIND, "Time scheduler")))
00455 XrdLog.Emsg("Scheduler", retc, "create time scheduler thread");
00456
00457
00458
00459 if (max_Workidl > 0) Schedule((XrdJob *)this, (time_t)max_Workidl+time(0));
00460
00461
00462
00463 if (!(numw = min_Workers/3)) numw = 2;
00464 while(numw--) hireWorker(0);
00465
00466
00467
00468 TRACE(SCHED, "Starting with " <<num_Workers <<" workers" );
00469 }
00470
00471
00472
00473
00474
00475 int XrdScheduler::Stats(char *buff, int blen, int do_sync)
00476 {
00477 int cnt_Jobs, cnt_JobsinQ, xam_QLength, cnt_Workers, cnt_idl;
00478 int cnt_TCreate, cnt_TDestroy, cnt_Limited;
00479 static char statfmt[] = "<stats id=\"sched\"><jobs>%d</jobs>"
00480 "<inq>%d</inq><maxinq>%d</maxinq>"
00481 "<threads>%d</threads><idle>%d</idle>"
00482 "<tcr>%d</tcr><tde>%d</tde>"
00483 "<tlimr>%d</tlimr></stats>";
00484
00485
00486
00487 if (!buff) return sizeof(statfmt) + 16*8;
00488
00489
00490
00491 if (do_sync) DispatchMutex.Lock();
00492 cnt_idl = idl_Workers;
00493 if (do_sync) DispatchMutex.UnLock();
00494
00495
00496
00497 if (do_sync) SchedMutex.Lock();
00498 cnt_Workers = num_Workers;
00499 cnt_Jobs = num_Jobs;
00500 cnt_JobsinQ = num_JobsinQ;
00501 xam_QLength = max_QLength;
00502 cnt_TCreate = num_TCreate;
00503 cnt_TDestroy= num_TDestroy;
00504 cnt_Limited = num_Limited;
00505 if (do_sync) SchedMutex.UnLock();
00506
00507
00508
00509 return snprintf(buff, blen, statfmt, cnt_Jobs, cnt_JobsinQ, xam_QLength,
00510 cnt_Workers, cnt_idl, cnt_TCreate, cnt_TDestroy,
00511 cnt_Limited);
00512 }
00513
00514
00515
00516
00517
00518 void XrdScheduler::TimeSched()
00519 {
00520 XrdJob *jp;
00521 int wtime;
00522
00523
00524
00525 do {TimerMutex.Lock();
00526 if (TimerQueue) wtime = TimerQueue->SchedTime-time(0);
00527 else wtime = 60*60;
00528 if (wtime > 0)
00529 {TimerMutex.UnLock();
00530 TimerRings.Wait(wtime);
00531 } else {
00532 jp = TimerQueue;
00533 TimerQueue = jp->NextJob;
00534 Schedule(jp);
00535 TimerMutex.UnLock();
00536 }
00537 } while(1);
00538 }
00539
00540
00541
00542
00543
00544
00545
00546
00547 void XrdScheduler::hireWorker(int dotrace)
00548 {
00549 pthread_t tid;
00550 int retc;
00551
00552
00553
00554 SchedMutex.Lock();
00555 if (num_Workers >= max_Workers)
00556 {num_Limited++;
00557 if ((num_Limited & 4095) == 1)
00558 XrdLog.Emsg("Scheduler","Thread limit has been reached!");
00559 SchedMutex.UnLock();
00560 return;
00561 }
00562 num_Workers++;
00563 num_TCreate++;
00564 SchedMutex.UnLock();
00565
00566
00567
00568
00569 retc = XrdSysThread::Run(&tid, XrdStartWorking, (void *)this, 0, "Worker");
00570
00571
00572
00573 if (retc)
00574 {XrdLog.Emsg("Scheduler", retc, "create worker thread");
00575 SchedMutex.Lock();
00576 num_Workers--;
00577 num_TCreate--;
00578 max_Workers = num_Workers;
00579 min_Workers = (max_Workers/10 ? max_Workers/10 : 1);
00580 stk_Workers = max_Workers/4*3;
00581 SchedMutex.UnLock();
00582 } else if (dotrace) TRACE(SCHED, "Now have " <<num_Workers <<" workers" );
00583 }
00584
00585
00586
00587
00588
00589 void XrdScheduler::traceExit(pid_t pid, int status)
00590 { const char *why;
00591 int retc;
00592
00593 if (WIFEXITED(status))
00594 {retc = WEXITSTATUS(status);
00595 why = " exited with rc=";
00596 } else if (WIFSIGNALED(status))
00597 {retc = WTERMSIG(status);
00598 why = " killed with signal ";
00599 } else {retc = 0;
00600 why = " changed state ";
00601 }
00602 TRACE(SCHED, "Process " <<pid <<why <<retc);
00603 }