XrdScheduler.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                       X r d S c h e d u l e r . h h                        */
00004 /*                                                                            */
00005 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*       All Rights Reserved. See XrdInfo.cc for complete License Terms       */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC03-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //        $Id: XrdScheduler.cc 34000 2010-06-21 06:49:56Z ganis $
00012 
00013 const char *XrdSchedulerCVSID = "$Id: XrdScheduler.cc 34000 2010-06-21 06:49:56Z ganis $";
00014 
00015 #include <errno.h>
00016 #include <signal.h>
00017 #include <stdio.h>
00018 #include <sys/types.h>
00019 #include <sys/wait.h>
00020 #ifdef __macos__
00021 #include <AvailabilityMacros.h>
00022 #endif
00023 
00024 #include "Xrd/XrdJob.hh"
00025 #include "Xrd/XrdScheduler.hh"
00026 #include "Xrd/XrdTrace.hh"
00027 #include "XrdSys/XrdSysError.hh"
00028 
00029 /******************************************************************************/
00030 /*                        G l o b a l   O b j e c t s                         */
00031 /******************************************************************************/
00032 
00033 extern XrdSysError   XrdLog;
00034   
00035 #ifndef NODEBUG
00036 extern XrdOucTrace   XrdTrace;
00037 #endif
00038 
00039        const char   *XrdScheduler::TraceID = "Sched";
00040 
00041 /******************************************************************************/
00042 /*                         L o c a l   C l a s s e s                          */
00043 /******************************************************************************/
00044 
00045 class XrdSchedulerPID
00046      {public:
00047       XrdSchedulerPID *next;
00048       pid_t            pid;
00049 
00050       XrdSchedulerPID(pid_t newpid, XrdSchedulerPID *prev) 
00051                         {next = prev; pid = newpid;}
00052      ~XrdSchedulerPID() {}
00053      };
00054   
00055 /******************************************************************************/
00056 /*            E x t e r n a l   T h r e a d   I n t e r f a c e s             */
00057 /******************************************************************************/
00058   
00059 void *XrdStartReaper(void *carg)
00060       {XrdScheduler *sp = (XrdScheduler *)carg;
00061        sp->Reaper();
00062        return (void *)0;
00063       }
00064 
00065 void *XrdStartTSched(void *carg)
00066       {XrdScheduler *sp = (XrdScheduler *)carg;
00067        sp->TimeSched();
00068        return (void *)0;
00069       }
00070 
00071 void *XrdStartWorking(void *carg)
00072       {XrdScheduler *sp = (XrdScheduler *)carg;
00073        sp->Run();
00074        return (void *)0;
00075       }
00076 
00077 /******************************************************************************/
00078 /*                           C o n s t r u c t o r                            */
00079 /******************************************************************************/
00080   
00081 XrdScheduler::XrdScheduler(int minw, int maxw, int maxi)
00082               : XrdJob("underused thread monitor"),
00083                 WorkAvail(0, "sched work")
00084 {
00085     min_Workers =  minw;
00086     max_Workers =  maxw;
00087     max_Workidl =  maxi;
00088     num_Workers =  0;
00089     num_JobsinQ =  0;
00090     stk_Workers =  maxw - (maxw/4*3);
00091     idl_Workers =  0;
00092     num_Jobs    =  0;
00093     max_QLength =  0;
00094     num_TCreate =  0;
00095     num_TDestroy=  0;
00096     num_Limited =  0;
00097     firstPID    =  0;
00098     WorkFirst = WorkLast = TimerQueue = 0;
00099 }
00100  
00101 /******************************************************************************/
00102 /*                            D e s t r u c t o r                             */
00103 /******************************************************************************/
00104 
00105 XrdScheduler::~XrdScheduler()  // The scheduler is never deleted!
00106 {
00107 }
00108  
00109 /******************************************************************************/
00110 /*                                C a n c e l                                 */
00111 /******************************************************************************/
00112 
00113 void XrdScheduler::Cancel(XrdJob *jp)
00114 {
00115    XrdJob *p, *pp = 0;
00116 
00117 // Lock the queue
00118 //
00119    TimerMutex.Lock();
00120 
00121 // Find the matching job, if any
00122 //
00123    p = TimerQueue;
00124    while(p && p != jp) {pp = p; p = p->NextJob;}
00125 
00126 // Delete the job element
00127 //
00128    if (p)
00129       {if (pp) pp->NextJob = p->NextJob;
00130           else  TimerQueue = p->NextJob;
00131        TRACE(SCHED, "time event " <<jp->Comment <<" cancelled");
00132       }
00133 
00134 // All done
00135 //
00136    TimerMutex.UnLock();
00137 }
00138   
00139 /******************************************************************************/
00140 /*                                  D o I t                                   */
00141 /******************************************************************************/
00142 
00143 void XrdScheduler::DoIt()
00144 {
00145    int num_kill, num_idle;
00146 
00147 // Now check if there are too many idle threads (kill them if there are)
00148 //
00149    if (!num_JobsinQ)
00150       {DispatchMutex.Lock(); num_idle = idl_Workers; DispatchMutex.UnLock();
00151        num_kill = num_idle - min_Workers;
00152        TRACE(SCHED, num_Workers <<" threads; " <<num_idle <<" idle");
00153        if (num_kill > 0)
00154           {if (num_kill > 1) num_kill = num_kill/2;
00155            SchedMutex.Lock();
00156            num_Layoffs = num_kill;
00157            while(num_kill--) WorkAvail.Post();
00158            SchedMutex.UnLock();
00159           }
00160       }
00161 
00162 // Check if we should reschedule ourselves
00163 //
00164    if (max_Workidl > 0) Schedule((XrdJob *)this, max_Workidl+time(0));
00165 }
00166 
00167 /******************************************************************************/
00168 /*                                  F o r k                                   */
00169 /******************************************************************************/
00170   
00171 // This entry exists solely so that we can start a reaper thread for processes
00172 //
00173 pid_t XrdScheduler::Fork(const char *id)
00174 {
00175    static int  retc, ReaperStarted = 0;
00176    pthread_t tid;
00177    pid_t pid;
00178 
00179 // Fork
00180 //
00181    if ((pid = fork()) < 0)
00182       {XrdLog.Emsg("Scheduler",errno,"fork to handle",id);
00183        return pid;
00184       }
00185    if (!pid) return pid;
00186 
00187 // Obtain the status of the reaper thread.
00188 //
00189    ReaperMutex.Lock();
00190    firstPID = new XrdSchedulerPID(pid, firstPID);
00191    retc = ReaperStarted;
00192    ReaperStarted = 1;
00193    ReaperMutex.UnLock();
00194 
00195 // Start the reaper thread if it has not started.
00196 //
00197    if (!retc)
00198       if ((retc = XrdSysThread::Run(&tid, XrdStartReaper, (void *)this,
00199                                     0, "Process reaper")))
00200          {XrdLog.Emsg("Scheduler", retc, "create reaper thread");
00201           ReaperStarted = 0;
00202          }
00203 
00204    return pid;
00205 }
00206 
00207 /******************************************************************************/
00208 /*                                R e a p e r                                 */
00209 /******************************************************************************/
00210   
00211 void *XrdScheduler::Reaper()
00212 {
00213    int status;
00214    pid_t pid;
00215    XrdSchedulerPID *tp, *ptp, *xtp;
00216 #if defined(__macos__) && !defined(MAC_OS_X_VERSION_10_5)
00217    struct timespec ts = { 1, 0 };
00218 #else
00219    sigset_t Sset;
00220    int signum;
00221 
00222 // Set up for signal handling. Note: main() must block this signal at start)
00223 //
00224    sigemptyset(&Sset);
00225    sigaddset(&Sset, SIGCHLD);
00226 #endif
00227 
00228 // Wait for all outstanding children
00229 //
00230    do {ReaperMutex.Lock();
00231        tp = firstPID; ptp = 0;
00232        while(tp)
00233             {do {pid = waitpid(tp->pid, &status, WNOHANG);}
00234                 while (pid < 0 && errno == EINTR);
00235              if (pid > 0)
00236                 {if (TRACING(TRACE_SCHED)) traceExit(pid, status);
00237                  xtp = tp; tp = tp->next;
00238                  if (ptp) ptp->next = tp;
00239                     else   firstPID = tp;
00240                  delete xtp;
00241                 } else {ptp = tp; tp = tp->next;}
00242              }
00243        ReaperMutex.UnLock();
00244 #if defined(__macos__) && !defined(MAC_OS_X_VERSION_10_5)
00245        // Mac OS X sigwait() is broken on <= 10.4.
00246       } while (nanosleep(&ts, 0) <= 0);
00247 #else
00248       } while(sigwait(&Sset, &signum) >= 0);
00249 #endif
00250    return (void *)0;
00251 }
00252 
00253 /******************************************************************************/
00254 /*                                   R u n                                    */
00255 /******************************************************************************/
00256   
00257 void XrdScheduler::Run()
00258 {
00259    int waiting;
00260    XrdJob *jp;
00261 
00262 // Wait for work then do it (an endless task for a worker thread)
00263 //
00264    do {do {DispatchMutex.Lock();          idl_Workers++;DispatchMutex.UnLock();
00265            WorkAvail.Wait();
00266            DispatchMutex.Lock();waiting = --idl_Workers;DispatchMutex.UnLock();
00267            SchedMutex.Lock();
00268            if ((jp = WorkFirst))
00269               {if (!(WorkFirst = jp->NextJob)) WorkLast = 0;
00270                if (num_JobsinQ) num_JobsinQ--;
00271                   else XrdLog.Emsg("Scheduler","Job queue count underflow!");
00272               } else {
00273                num_JobsinQ = 0;
00274                if (num_Layoffs > 0)
00275                   {num_Layoffs--;
00276                    if (waiting)
00277                       {num_TDestroy++; num_Workers--;
00278                        TRACE(SCHED, "terminating thread; workers=" <<num_Workers);
00279                        SchedMutex.UnLock();
00280                        return;
00281                       }
00282                   }
00283               }
00284            SchedMutex.UnLock();
00285           } while(!jp);
00286 
00287     // Check if we should hire a new worker (we always want 1 idle thread)
00288     // before running this job.
00289     //
00290        if (!waiting) hireWorker();
00291        TRACE(SCHED, "running " <<jp->Comment <<" inq=" <<num_JobsinQ);
00292        jp->DoIt();
00293       } while(1);
00294 }
00295  
00296 /******************************************************************************/
00297 /*                              S c h e d u l e                               */
00298 /******************************************************************************/
00299   
00300 void XrdScheduler::Schedule(XrdJob *jp)
00301 {
00302 // Lock down our data area
00303 //
00304    SchedMutex.Lock();
00305 
00306 // Place the request on the queue and broadcast it
00307 //
00308    jp->NextJob  = 0;
00309    if (WorkFirst)
00310       {WorkLast->NextJob = jp;
00311        WorkLast = jp;
00312       } else {
00313        WorkFirst = jp;
00314        WorkLast  = jp;
00315       }
00316    WorkAvail.Post();
00317 
00318 // Calculate statistics
00319 //
00320    num_Jobs++;
00321    num_JobsinQ++;
00322    if (num_JobsinQ > max_QLength) max_QLength = num_JobsinQ;
00323 
00324 // Unlock the data area and return
00325 //
00326    SchedMutex.UnLock();
00327 }
00328 
00329 /******************************************************************************/
00330   
00331 void XrdScheduler::Schedule(int numjobs, XrdJob *jfirst, XrdJob *jlast)
00332 {
00333 
00334 // Lock down our data area
00335 //
00336    SchedMutex.Lock();
00337 
00338 // Place the request list on the queue
00339 //
00340    jlast->NextJob = 0;
00341    if (WorkFirst)
00342       {WorkLast->NextJob = jfirst;
00343        WorkLast = jlast;
00344       } else {
00345        WorkFirst = jfirst;
00346        WorkLast  = jlast;
00347       }
00348 
00349 // Calculate statistics
00350 //
00351    num_Jobs    += numjobs;
00352    num_JobsinQ += numjobs;
00353    if (num_JobsinQ > max_QLength) max_QLength = num_JobsinQ;
00354 
00355 // Indicate number of jobs to work on
00356 //
00357    while(numjobs--) WorkAvail.Post();
00358 
00359 // Unlock the data area and return
00360 //
00361    SchedMutex.UnLock();
00362 }
00363 
00364 /******************************************************************************/
00365 
00366 void XrdScheduler::Schedule(XrdJob *jp, time_t atime)
00367 {
00368    XrdJob *pp = 0, *p;
00369 
00370 // Cancel this event, if scheduled
00371 //
00372    Cancel(jp);
00373 
00374 // Lock the queue
00375 //
00376    TRACE(SCHED, "scheduling " <<jp->Comment <<" in " <<atime-time(0) <<" seconds");
00377    jp->SchedTime = atime;
00378    TimerMutex.Lock();
00379 
00380 // Find the insertion point for the work element
00381 //
00382    p = TimerQueue;
00383    while(p && p->SchedTime <= atime) {pp = p; p = p->NextJob;}
00384 
00385 // Insert the job element
00386 //
00387    jp->NextJob = p;
00388    if (pp)  pp->NextJob = jp;
00389       else {TimerQueue = jp; TimerRings.Signal();}
00390 
00391 // All done
00392 //
00393    TimerMutex.UnLock();
00394 }
00395 
00396 /******************************************************************************/
00397 /*                              s e t P a r m s                               */
00398 /******************************************************************************/
00399   
00400 void XrdScheduler::setParms(int minw, int maxw, int avlw, int maxi, int once)
00401 {
00402    static int isSet = 0;
00403 
00404 // Lock the data area and check for 1-time set
00405 //
00406    SchedMutex.Lock();
00407    if (once && isSet) {SchedMutex.UnLock(); return;}
00408    isSet = 1;
00409 
00410 // get a consistent view of all the values
00411 //
00412    if (maxw <= 0) maxw = max_Workers;
00413    if (minw < 0) minw = (maxw/10 ? maxw/10 : 1);
00414       else if (minw > maxw) minw = maxw;
00415    if (avlw < 0) avlw = maxw/4*3;
00416       else if (avlw > maxw) avlw = maxw;
00417 
00418 // Set the values
00419 //
00420    min_Workers = minw;
00421    max_Workers = maxw;
00422    stk_Workers = maxw - avlw;
00423    if (maxi >=0)  max_Workidl = maxi;
00424 
00425 // Unlock the data area
00426 //
00427    SchedMutex.UnLock();
00428 
00429 // If we have an idle interval, schedule the idle check
00430 //
00431    if (maxi > 0)
00432       {Cancel((XrdJob *)this);
00433        Schedule((XrdJob *)this, (time_t)maxi+time(0));
00434       }
00435 
00436 // Debug the info
00437 //
00438    TRACE(SCHED,"Set min_Workers=" <<min_Workers <<" max_Workers=" <<max_Workers);
00439    TRACE(SCHED,"Set stk_Workers=" <<stk_Workers <<" max_Workidl=" <<max_Workidl);
00440 }
00441 
00442 /******************************************************************************/
00443 /*                                 S t a r t                                  */
00444 /******************************************************************************/
00445   
00446 void XrdScheduler::Start() // Serialized one time call!
00447 {
00448     int retc, numw;
00449     pthread_t tid;
00450 
00451 // Start a time based scheduler
00452 //
00453    if ((retc = XrdSysThread::Run(&tid, XrdStartTSched, (void *)this,
00454                                  XRDSYSTHREAD_BIND, "Time scheduler")))
00455       XrdLog.Emsg("Scheduler", retc, "create time scheduler thread");
00456 
00457 // If we an idle interval, schedule the idle check
00458 //
00459    if (max_Workidl > 0) Schedule((XrdJob *)this, (time_t)max_Workidl+time(0));
00460 
00461 // Start 1/3 of the minimum number of threads
00462 //
00463    if (!(numw = min_Workers/3)) numw = 2;
00464    while(numw--) hireWorker(0);
00465 
00466 // Unlock the data area
00467 //
00468    TRACE(SCHED, "Starting with " <<num_Workers <<" workers" );
00469 }
00470 
00471 /******************************************************************************/
00472 /*                                 S t a t s                                  */
00473 /******************************************************************************/
00474   
00475 int XrdScheduler::Stats(char *buff, int blen, int do_sync)
00476 {
00477     int cnt_Jobs, cnt_JobsinQ, xam_QLength, cnt_Workers, cnt_idl;
00478     int cnt_TCreate, cnt_TDestroy, cnt_Limited;
00479     static char statfmt[] = "<stats id=\"sched\"><jobs>%d</jobs>"
00480                 "<inq>%d</inq><maxinq>%d</maxinq>"
00481                 "<threads>%d</threads><idle>%d</idle>"
00482                 "<tcr>%d</tcr><tde>%d</tde>"
00483                 "<tlimr>%d</tlimr></stats>";
00484 
00485 // If only length wanted, do so
00486 //
00487    if (!buff) return sizeof(statfmt) + 16*8;
00488 
00489 // Get values protected by the Dispatch lock (avoid lock if no sync needed)
00490 //
00491    if (do_sync) DispatchMutex.Lock();
00492    cnt_idl = idl_Workers;
00493    if (do_sync) DispatchMutex.UnLock();
00494 
00495 // Get values protected by the Scheduler lock (avoid lock if no sync needed)
00496 //
00497    if (do_sync) SchedMutex.Lock();
00498    cnt_Workers = num_Workers;
00499    cnt_Jobs    = num_Jobs;
00500    cnt_JobsinQ = num_JobsinQ;
00501    xam_QLength = max_QLength;
00502    cnt_TCreate = num_TCreate;
00503    cnt_TDestroy= num_TDestroy;
00504    cnt_Limited = num_Limited;
00505    if (do_sync) SchedMutex.UnLock();
00506 
00507 // Format the stats and return them
00508 //
00509    return snprintf(buff, blen, statfmt, cnt_Jobs, cnt_JobsinQ, xam_QLength,
00510                    cnt_Workers, cnt_idl, cnt_TCreate, cnt_TDestroy,
00511                    cnt_Limited);
00512 }
00513 
00514 /******************************************************************************/
00515 /*                             T i m e S c h e d                              */
00516 /******************************************************************************/
00517   
00518 void XrdScheduler::TimeSched()
00519 {
00520    XrdJob *jp;
00521    int wtime;
00522 
00523 // Continuous loop until we find some work here
00524 //
00525    do {TimerMutex.Lock();
00526        if (TimerQueue) wtime = TimerQueue->SchedTime-time(0);
00527           else wtime = 60*60;
00528        if (wtime > 0)
00529           {TimerMutex.UnLock();
00530            TimerRings.Wait(wtime);
00531           } else {
00532            jp = TimerQueue;
00533            TimerQueue = jp->NextJob;
00534            Schedule(jp);
00535            TimerMutex.UnLock();
00536           }
00537        } while(1);
00538 }
00539 
00540 /******************************************************************************/
00541 /*                       P r i v a t e   M e t h o d s                        */
00542 /******************************************************************************/
00543 /******************************************************************************/
00544 /*                           h i r e   W o r k e r                            */
00545 /******************************************************************************/
00546   
00547 void XrdScheduler::hireWorker(int dotrace)
00548 {
00549    pthread_t tid;
00550    int retc;
00551 
00552 // First check if we reached the maximum number of workers
00553 //
00554    SchedMutex.Lock();
00555    if (num_Workers >= max_Workers)
00556       {num_Limited++;
00557        if ((num_Limited & 4095) == 1)
00558            XrdLog.Emsg("Scheduler","Thread limit has been reached!");
00559        SchedMutex.UnLock();
00560        return;
00561       }
00562    num_Workers++;
00563    num_TCreate++;
00564    SchedMutex.UnLock();
00565 
00566 // Start a new thread. We do this without the schedMutex to avoid hang-ups. If
00567 // we can't start a new thread, we recalculate the maximum number we can.
00568 //
00569    retc = XrdSysThread::Run(&tid, XrdStartWorking, (void *)this, 0, "Worker");
00570 
00571 // Now check the results and correct if we couldn't start the thread
00572 //
00573    if (retc)
00574       {XrdLog.Emsg("Scheduler", retc, "create worker thread");
00575        SchedMutex.Lock();
00576        num_Workers--;
00577        num_TCreate--;
00578        max_Workers = num_Workers;
00579        min_Workers = (max_Workers/10 ? max_Workers/10 : 1);
00580        stk_Workers = max_Workers/4*3;
00581        SchedMutex.UnLock();
00582       } else if (dotrace) TRACE(SCHED, "Now have " <<num_Workers <<" workers" );
00583 }
00584  
00585 /******************************************************************************/
00586 /*                             t r a c e E x i t                              */
00587 /******************************************************************************/
00588   
00589 void XrdScheduler::traceExit(pid_t pid, int status)
00590 {  const char *why;
00591    int   retc;
00592 
00593    if (WIFEXITED(status))
00594       {retc = WEXITSTATUS(status);
00595        why = " exited with rc=";
00596       } else if (WIFSIGNALED(status))
00597                 {retc = WTERMSIG(status);
00598                  why = " killed with signal ";
00599                 } else {retc = 0;
00600                         why = " changed state ";
00601                        }
00602    TRACE(SCHED, "Process " <<pid <<why <<retc);
00603 }

Generated on Tue Jul 5 14:46:15 2011 for ROOT_528-00b_version by  doxygen 1.5.1