XrdProofSched.cxx

Go to the documentation of this file.
00001 // @(#)root/proofd:$Id: XrdProofSched.cxx 29127 2009-06-22 09:07:57Z brun $
00002 // Author: G. Ganis  September 2007
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // XrdProofSched                                                        //
00015 //                                                                      //
00016 // Authors: G. Ganis, CERN, 2007                                        //
00017 //                                                                      //
00018 // Interface for a PROOF scheduler.                                     //
00019 // Alternative scheduler implementations should be provided as shared   //
00020 // library containing an implementation of this class. The library      //
00021 // must also provide a function to load an instance of this class       //
00022 // with the following signature (see commented example below):          //
00023 // extern "C" {                                                         //
00024 //    XrdProofSched *XrdgetProofSched(const char *cfg,                  //
00025 //                                    XrdProofdManager *mgr,            //
00026 //                                    XrdProofGroupMgr *grpmgr,         //
00027 //                                    XrdSysError *edest);              //
00028 // }                                                                    //
00029 // Here 'cfg' is the xrootd config file where directives to configure   //
00030 // the scheduler are specified, 'mgr' is the instance of the cluster    //
00031 // manager from where the scheduler can get info about the available    //
00032 // workers and their status, 'grpmgr' is the instance of the group      //
00033 // bringing the definition of the groups for this run, and 'edest' is   //
00034 // instance of the error logger to be used.                             //
00035 // The scheduler is identified by a name of max 16 chars.               //
00036 //                                                                      //
00037 //////////////////////////////////////////////////////////////////////////
00038 
00039 #include <list>
00040 
00041 #include "XProofProtocol.h"
00042 #include "XrdProofdManager.h"
00043 #include "XrdProofdNetMgr.h"
00044 #include "XrdProofdProofServMgr.h"
00045 #include "XrdProofGroup.h"
00046 #include "XrdProofSched.h"
00047 #include "XrdProofdProofServ.h"
00048 #include "XrdProofWorker.h"
00049 
00050 #include "XrdOuc/XrdOucString.hh"
00051 #include "XrdOuc/XrdOucStream.hh"
00052 #ifdef OLDXRDOUC
00053 #  include "XrdSysToOuc.h"
00054 #  include "XrdOuc/XrdOucError.hh"
00055 #else
00056 #  include "XrdSys/XrdSysError.hh"
00057 #endif
00058 
00059 // Tracing
00060 #include "XrdProofdTrace.h"
00061 
00062 //
00063 // Example of scheduler loader for an implementation called XrdProofSchedDyn
00064 //
00065 // extern "C" {
00066 // //______________________________________________________________________________
00067 // XrdProofSched *XrdgetProofSched(const char *cfg, XrdProofdManager *mgr,
00068 //                                 XrdProofGroupMgr *grpmgr, XrdSysError *edest)
00069 // {
00070 //   // This scheduler is meant to live in a shared library. The interface below is
00071 //   // used by the server to obtain a copy of the scheduler object.
00072 //
00073 //   XrdProofSchedDyn *pss = new XrdProofSchedDyn(mgr, grpmgr, edest);
00074 //   if (pss && pss->Config(cfg) == 0) {
00075 //      return (XrdProofSched *) pss;
00076 //   }
00077 //   if (pss)
00078 //      delete pss;
00079 //   return (XrdProofSched *)0;
00080 // }}
00081 
00082 //--------------------------------------------------------------------------
00083 //
00084 // XrdProofSchedCron
00085 //
00086 // Scheduler thread
00087 //
00088 //--------------------------------------------------------------------------
00089 void *XrdProofSchedCron(void *p)
00090 {
00091    // This is an endless loop to check the system periodically or when
00092    // triggered via a message in a dedicated pipe
00093    XPDLOC(SCHED, "SchedCron")
00094 
00095    XrdProofSched *sched = (XrdProofSched *)p;
00096    if (!(sched)) {
00097       TRACE(XERR, "undefined scheduler: cannot start");
00098       return (void *)0;
00099    }
00100 
00101    // Time of last session check
00102    int lastcheck = time(0), ckfreq = sched->CheckFrequency(), deltat = 0;
00103    while(1) {
00104       // We wait for processes to communicate a session status change
00105       if ((deltat = ckfreq - (time(0) - lastcheck)) <= 0)
00106          deltat = ckfreq;
00107       int pollRet = sched->Pipe()->Poll(deltat);
00108 
00109       if (pollRet > 0) {
00110          // Read message
00111          XpdMsg msg;
00112          int rc = 0;
00113          if ((rc = sched->Pipe()->Recv(msg)) != 0) {
00114             XPDERR("problems receiving message; errno: "<<-rc);
00115             continue;
00116          }
00117          // Parse type
00118          XrdOucString buf;
00119          if (msg.Type() == XrdProofSched::kReschedule) {
00120 
00121             TRACE(ALL, "received kReschedule");
00122 
00123             // Reschedule
00124             sched->Reschedule();
00125 
00126          } else {
00127 
00128             TRACE(XERR, "unknown type: "<<msg.Type());
00129             continue;
00130          }
00131       } else {
00132          // Notify
00133          TRACE(ALL, "running regular checks");
00134          // Run regular rescheduling checks
00135          sched->Reschedule();
00136          // Remember when ...
00137          lastcheck = time(0);
00138       }
00139    }
00140 
00141    // Should never come here
00142    return (void *)0;
00143 }
00144 
00145 //______________________________________________________________________________
00146 static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
00147 {
00148    // Compare two workers for sorting
00149 
00150    return ((lhs && rhs &&
00151             lhs->GetNActiveSessions() < rhs->GetNActiveSessions()) ? 1 : 0);
00152 }
00153 
00154 //______________________________________________________________________________
00155 int DoSchedDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
00156 {
00157    // Generic directive processor
00158 
00159    if (!d || !(d->fVal))
00160       // undefined inputs
00161       return -1;
00162 
00163    return ((XrdProofSched *)d->fVal)->ProcessDirective(d, val, cfg, rcf);
00164 }
00165 
00166 //______________________________________________________________________________
00167 XrdProofSched::XrdProofSched(const char *name,
00168                              XrdProofdManager *mgr, XrdProofGroupMgr *grpmgr,
00169                              const char *cfn,  XrdSysError *e)
00170               : XrdProofdConfig(cfn, e)
00171 {
00172    // Constructor
00173 
00174    fValid = 1;
00175    fMgr = mgr;
00176    fGrpMgr = grpmgr;
00177    fNextWrk = 1;
00178    fEDest = e;
00179    fUseFIFO = 0;
00180    ResetParameters();
00181 
00182    memset(fName, 0, kXPSMXNMLEN);
00183    if (name)
00184       memcpy(fName, name, kXPSMXNMLEN-1);
00185 
00186    // Configuration directives
00187    RegisterDirectives();
00188 }
00189 
00190 //__________________________________________________________________________
00191 void XrdProofSched::RegisterDirectives()
00192 {
00193    // Register directives for configuration
00194 
00195    Register("schedparam", new XrdProofdDirective("schedparam", this, &DoDirectiveClass));
00196    Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
00197 }
00198 
00199 //______________________________________________________________________________
00200 int XrdProofSched::DoDirective(XrdProofdDirective *d,
00201                                char *val, XrdOucStream *cfg, bool rcf)
00202 {
00203    // Update the priorities of the active sessions.
00204    XPDLOC(SCHED, "Sched::DoDirective")
00205 
00206    if (!d)
00207       // undefined inputs
00208       return -1;
00209 
00210    if (d->fName == "schedparam") {
00211       return DoDirectiveSchedParam(val, cfg, rcf);
00212    } else if (d->fName == "resource") {
00213       return DoDirectiveResource(val, cfg, rcf);
00214    }
00215    TRACE(XERR,"unknown directive: "<<d->fName);
00216    return -1;
00217 }
00218 
00219 
00220 //______________________________________________________________________________
00221 void XrdProofSched::ResetParameters()
00222 {
00223    // Reset values for the configurable parameters
00224 
00225    fMaxSessions = -1;
00226    fMaxRunning = -1;
00227    fWorkerMax = -1;
00228    fWorkerSel = kSSORoundRobin;
00229    fOptWrksPerUnit = 1;
00230    fMinForQuery = 0;
00231    fNodesFraction = 0.5;
00232    fCheckFrequency = 30;
00233 }
00234 
00235 //______________________________________________________________________________
00236 int XrdProofSched::Config(bool rcf)
00237 {
00238    // Configure this instance using the content of file 'cfn'.
00239    // Return 0 on success, -1 in case of failure (file does not exists
00240    // or containing incoherent information).
00241    XPDLOC(SCHED, "Sched::Config")
00242 
00243    // Run first the configurator
00244    if (XrdProofdConfig::Config(rcf) != 0) {
00245       XPDERR("problems parsing file ");
00246       fValid = 0;
00247       return -1;
00248    }
00249 
00250    int rc = 0;
00251 
00252    XrdOucString msg;
00253 
00254    // Notify
00255    XPDFORM(msg, "maxsess: %d, maxrun: %d, maxwrks: %d, selopt: %d, fifo:%d",
00256                 fMaxSessions, fMaxRunning, fWorkerMax, fWorkerSel, fUseFIFO);
00257    TRACE(DBG, msg);
00258 
00259    if (!rcf) {
00260       // Start cron thread
00261       pthread_t tid;
00262       if (XrdSysThread::Run(&tid, XrdProofSchedCron,
00263                            (void *)this, 0, "Scheduler cron thread") != 0) {
00264          XPDERR("could not start cron thread");
00265          fValid = 0;
00266          return 0;
00267       }
00268       TRACE(ALL, "cron thread started");
00269    }
00270 
00271    // Done
00272    return rc;
00273 }
00274 
00275 //______________________________________________________________________________
00276 int XrdProofSched::Enqueue(XrdProofdProofServ *xps, XrdProofQuery *query)
00277 {
00278    // Queue a query in the session; if this is the first querym enqueue also
00279    // the session
00280    XPDDOM(SCHED)
00281 
00282    if (xps->Enqueue(query) == 1) {
00283       std::list<XrdProofdProofServ *>::iterator ii;
00284       for (ii = fQueue.begin(); ii != fQueue.end(); ii++) {
00285          if ((*ii)->Status() == kXPD_running) break;
00286       }
00287       if (ii != fQueue.end()) {
00288          fQueue.insert(ii, xps);
00289       } else {
00290          fQueue.push_back(xps);
00291       }
00292    }
00293    if (TRACING(DBG)) DumpQueues("Enqueue");
00294 
00295    return 0;
00296 }
00297 
00298 //______________________________________________________________________________
00299 void XrdProofSched::DumpQueues(const char *prefix)
00300 {
00301    // Dump the content of the waiting sessions queue
00302 
00303    XPDLOC(SCHED, "DumpQueues")
00304 
00305    TRACE(ALL," ++++++++++++++++++++ DumpQueues ++++++++++++++++++++++++++++++++ ");
00306    if (prefix) TRACE(ALL, " +++ Called from: "<<prefix);
00307    TRACE(ALL," +++ # of waiting sessions: "<<fQueue.size()); 
00308    std::list<XrdProofdProofServ *>::iterator ii;
00309    int i = 0;
00310    for (ii = fQueue.begin(); ii != fQueue.end(); ii++) {
00311       TRACE(ALL," +++ #"<<++i<<" client:"<< (*ii)->Client()<<" # of queries: "<< (*ii)->Queries()->size());
00312    }
00313    TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
00314 
00315    return;
00316 }
00317 
00318 //______________________________________________________________________________
00319 XrdProofdProofServ *XrdProofSched::FirstSession()
00320 {
00321    // Get first valid session.
00322    // The dataset information can be used to assign workers.
00323    XPDDOM(SCHED)
00324 
00325    if (fQueue.empty())
00326       return 0;
00327    XrdProofdProofServ *xps = fQueue.front();
00328    while (xps && !(xps->IsValid())) {
00329       fQueue.remove(xps);
00330       xps = fQueue.front();
00331    }
00332    if (TRACING(DBG)) DumpQueues("FirstSession");
00333    // The session will be removed after workers are assigned
00334    return xps;
00335 }
00336 
00337 //______________________________________________________________________________
00338 int XrdProofSched::GetNumWorkers(XrdProofdProofServ *xps)
00339 {
00340    // Calculate the number of workers to be used given the state of the cluster
00341    XPDLOC(SCHED, "Sched::GetNumWorkers")
00342 
00343    // Go through the list of hosts and see how many CPUs are not used.
00344    int nFreeCPUs = 0;
00345    std::list<XrdProofWorker *> *wrks = fMgr->NetMgr()->GetActiveWorkers();
00346    std::list<XrdProofWorker *>::iterator iter;
00347    for (iter = wrks->begin(); iter != wrks->end(); ++iter) {
00348       TRACE(DBG, (*iter)->fImage<<" : # act: "<<(*iter)->fProofServs.size());
00349       if ((*iter)->fType != 'M' && (*iter)->fType != 'S'
00350           && (int) (*iter)->fProofServs.size() < fOptWrksPerUnit)
00351          // add number of free slots
00352          nFreeCPUs += fOptWrksPerUnit - (*iter)->fProofServs.size();
00353    }
00354 
00355    float priority = 1;
00356    XrdProofGroup *grp = 0;
00357    if (fGrpMgr && xps->Group())
00358       grp = fGrpMgr->GetGroup(xps->Group());
00359    if (grp) {
00360       std::list<XrdProofdProofServ *> *sessions = fMgr->SessionMgr()->ActiveSessions();
00361       std::list<XrdProofdProofServ *>::iterator sesIter;
00362       float summedPriority = 0;
00363       for (sesIter = sessions->begin(); sesIter != sessions->end(); ++sesIter) {
00364          if ((*sesIter)->Group()) {
00365             XrdProofGroup *g = fGrpMgr->GetGroup((*sesIter)->Group());
00366             if (g)
00367                summedPriority += g->Priority();
00368          }
00369       }
00370       if (summedPriority > 0)
00371          priority = (grp->Priority() * sessions->size()) / summedPriority;
00372    }
00373 
00374    int nWrks = (int)(nFreeCPUs * fNodesFraction * priority);
00375    if (nWrks <= fMinForQuery) {
00376       nWrks = fMinForQuery;
00377    } else if (nWrks >= (int) wrks->size()) {
00378       nWrks = wrks->size() - 1;
00379    }
00380    TRACE(DBG, nFreeCPUs<<" : "<< nWrks);
00381 
00382    return nWrks;
00383 }
00384 
00385 //______________________________________________________________________________
00386 int XrdProofSched::GetWorkers(XrdProofdProofServ *xps,
00387                               std::list<XrdProofWorker *> *wrks,
00388                               const char *querytag)
00389 {
00390    // Get a list of workers that can be used by session 'xps'.
00391    // The return code is:
00392    //  -1     Some failure occured; cannot continue
00393    //   0     A new list has been assigned to the session 'xps' and
00394    //         returned in 'wrks'
00395    //   1     The list currently assigned to the session is the one
00396    //         to be used
00397    //   2     No worker could be assigned now; session should be queued
00398 
00399    XPDLOC(SCHED, "Sched::GetWorkers")
00400 
00401    int rc = 0;
00402 
00403    TRACE(REQ, "enter: query tag: "<< ((querytag) ? querytag : ""));
00404 
00405    // Static or dynamic
00406    bool isDynamic = 1;
00407    if (querytag && !strncmp(querytag, XPD_GW_Static, strlen(XPD_GW_Static) - 1)) {
00408       isDynamic = 0;
00409    }
00410 
00411    // Check if the current assigned list of workers is valid
00412    if (querytag && xps && xps->Workers()->Num() > 0) {
00413       if (TRACING(REQ)) xps->DumpQueries();
00414       const char *cqtag = (xps->CurrentQuery()) ? xps->CurrentQuery()->GetTag() : "undef";
00415       TRACE(REQ, "current query tag: "<< cqtag );
00416       if (!strcmp(querytag, cqtag)) {
00417          // Remove the query to be processed from the queue
00418          xps->RemoveQuery(cqtag);
00419          TRACE(REQ, "current assignment for session "<< xps->SrvPID() << " is valid");
00420          // Current assignement is valid
00421          return 1;
00422       }
00423    }
00424 
00425    // The caller must provide a list where to store the result
00426    if (!wrks)
00427       return -1;
00428 
00429    // If the session has already assigned workers or there are
00430    // other queries waiting - just enqueue
00431    // FIFO is enforced by dynamic mode so it is checked just in case
00432    if (isDynamic) {
00433       if (fUseFIFO && xps->Workers()->Num() > 0) {
00434          if (!xps->GetQuery(querytag))
00435             Enqueue(xps, new XrdProofQuery(querytag));
00436          if (TRACING(DBG)) xps->DumpQueries();
00437          // Signal enqueing
00438          TRACE(REQ, "session has already assigned workers: enqueue");
00439          return 2;
00440       }
00441    }
00442 
00443    // The current, full list
00444    std::list<XrdProofWorker *> *acws = 0;
00445 
00446    if (!fMgr || !(acws = fMgr->NetMgr()->GetActiveWorkers()))
00447       return -1;
00448 
00449    // Point to the master element
00450    XrdProofWorker *mst = acws->front();
00451    if (!mst)
00452       return -1;
00453 
00454    if (fWorkerSel == kSSOLoadBased) {
00455       // Dynamic scheduling: the scheduler will determine the #workers
00456       // to be used based on the current load and assign the least loaded ones
00457 
00458       // Sort the workers by the load
00459       XrdProofWorker::Sort(acws, XpdWrkComp);
00460 
00461       // Get the advised number
00462       int nw = GetNumWorkers(xps);
00463 
00464       if (nw > 0) {
00465          // The master first (stats are updated in XrdProofdProtocol::GetWorkers)
00466          wrks->push_back(mst);
00467 
00468          std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
00469          while (nw--) {
00470             nxWrk++;
00471             // Add export version of the info
00472             // (stats are updated in XrdProofdProtocol::GetWorkers)
00473             wrks->push_back(*nxWrk);
00474          }
00475       } else {
00476          // if no workers were assigned
00477          // enqueue or send a list with only the master (processing refused)
00478          if (fUseFIFO) {
00479             // Enqueue the query/session
00480             // the returned list of workers was not filled
00481             if (!xps->GetQuery(querytag))
00482                Enqueue(xps, new XrdProofQuery(querytag));
00483             if (TRACING(DBG)) xps->DumpQueries();
00484             // Signal enqueing
00485             TRACE(REQ, "no workers currently available: session enqueued");
00486             return 2;
00487          } else {
00488             // The master first (stats are updated in XrdProofdProtocol::GetWorkers)
00489             wrks->push_back(mst);
00490          }
00491       }
00492       // Done
00493       return 0;
00494    }
00495 
00496    // Check if the check on the max number of sessions is enabled
00497    // We need at least 1 master and a worker
00498    std::list<XrdProofWorker *> *acwseff = 0;
00499    int maxnum = (querytag && strcmp(querytag, XPD_GW_Static)) ? fMaxRunning : fMaxSessions;
00500    bool ok = 0;
00501    if (isDynamic) {
00502       if (maxnum > 0) {
00503          acwseff = new std::list<XrdProofWorker *>;
00504          std::list<XrdProofWorker *>::iterator xWrk = acws->begin();
00505          if ((*xWrk)->Active() < maxnum) {
00506             acwseff->push_back(*xWrk);
00507             xWrk++;
00508             for (; xWrk != acws->end(); xWrk++) {
00509                if ((*xWrk)->Active() < maxnum) {
00510                   acwseff->push_back(*xWrk);
00511                   ok = 1;
00512                }
00513             }
00514          } else if (!fUseFIFO) {
00515             TRACE(REQ, "max number of sessions reached - ("<< maxnum <<")");
00516          }
00517          // Check the result
00518          if (!ok) { delete acwseff; acwseff = 0; }
00519          acws = acwseff;
00520       }
00521    } else {
00522       if (maxnum > 0) {
00523          // This is over-conservative for sub-selectiob (random, or round-robin options)
00524          // A better solution should be implemented for that.
00525          int nactsess = mst->GetNActiveSessions();
00526          TRACE(REQ, "act sess ... " << nactsess);
00527          if (nactsess < maxnum) {
00528             ok = 1;
00529          } else if (!fUseFIFO) {
00530             TRACE(REQ, "max number of sessions reached - ("<< maxnum <<")");
00531          }
00532          // Check the result
00533          if (!ok) acws = acwseff;
00534       }
00535    }
00536 
00537    // Make sure that something has been found
00538    if (!acws || acws->size() <= 1) {
00539       if (fUseFIFO) {
00540          // Enqueue the query/session
00541          // the returned list of workers was not filled
00542          if (!xps->GetQuery(querytag))
00543             Enqueue(xps, new XrdProofQuery(querytag));
00544          if (TRACING(REQ)) xps->DumpQueries();
00545          // Notify enqueing
00546          TRACE(REQ, "no workers currently available: session enqueued");
00547          return 2;
00548       } else {
00549          TRACE(XERR, "no worker available: do nothing");
00550          if (acwseff) { delete acwseff; acwseff = 0; }
00551          return -1;
00552       }
00553    }
00554 
00555    // If the session has already assigned workers just return
00556    if (xps->Workers()->Num() > 0) {
00557       // Current assignement is valid
00558       return 1;
00559    }
00560 
00561    // The master first (stats are updated in XrdProofdProtocol::GetWorkers)
00562    wrks->push_back(mst);
00563 
00564    if (fWorkerMax > 0 && fWorkerMax < (int) acws->size()) {
00565 
00566       // Now the workers
00567       if (fWorkerSel == kSSORandom) {
00568          // Random: the first time init the machine
00569          static bool rndmInit = 0;
00570          if (!rndmInit) {
00571             const char *randdev = "/dev/urandom";
00572             int fd;
00573             unsigned int seed;
00574             if ((fd = open(randdev, O_RDONLY)) != -1) {
00575                if (read(fd, &seed, sizeof(seed)) != sizeof(seed)) {
00576                   TRACE(XERR, "problems reading seed; errno: "<< errno);
00577                }
00578                srand(seed);
00579                close(fd);
00580                rndmInit = 1;
00581             }
00582          }
00583          // Selection
00584          int nwt = acws->size();
00585          std::vector<int> walloc(nwt, 0);
00586          std::vector<XrdProofWorker *> vwrk(nwt);
00587 
00588          // Fill the vector with cumulative number of actives
00589          int namx = -1;
00590          int i = 1;
00591          std::list<XrdProofWorker *>::iterator iwk = acws->begin();
00592          iwk++; // Skip master
00593          for ( ; iwk != acws->end(); iwk++) {
00594             vwrk[i] = *iwk;
00595             int na = (*iwk)->Active();
00596             printf(" %d", na);
00597             walloc[i] = na + walloc[i-1];
00598             i++;
00599             namx = (na > namx) ? na : namx;
00600          }
00601          printf("\n");
00602          // Normalize
00603          for (i = 1; i < nwt; i++) {
00604             if (namx > 0)
00605                walloc[i] = namx*i - walloc[i] + i;
00606             else
00607                walloc[i] = i;
00608          }
00609          int natot = walloc[nwt - 1];
00610 
00611          int nw = fWorkerMax;
00612          while (nw--) {
00613             // Normalized number
00614             int maxAtt = 10000, natt = 0;
00615             int iw = -1;
00616             while ((iw < 1 || iw >= nwt) && natt < maxAtt) {
00617                int jw = rand() % natot;
00618                for (i = 0; i < nwt; i++) {
00619                   if (jw < walloc[i]) {
00620                      // re-normalize the weights for the higher index entries
00621                      int j = 0;
00622                      for (j = i; j < nwt; j++) {
00623                         if (walloc[j] > 0)
00624                            walloc[j]--;
00625                      }
00626                      natot--;
00627                      iw = i;
00628                      break;
00629                   }
00630                }
00631             }
00632 
00633             if (iw > -1) {
00634                // Add to the list (stats are updated in XrdProofdProtocol::GetWorkers)
00635                wrks->push_back(vwrk[iw]);
00636             } else {
00637                // Unable to generate the right number
00638                TRACE(XERR, "random generation failed");
00639                rc = -1;
00640                break;
00641             }
00642          }
00643 
00644       } else {
00645          if (fNextWrk >= (int) acws->size())
00646             fNextWrk = 1;
00647          int iw = 0;
00648          std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
00649          int nw = fWorkerMax;
00650          while (nw--) {
00651             while (iw != fNextWrk) {
00652                nxWrk++;
00653                iw++;
00654             }
00655             // Add export version of the info
00656             // (stats are updated in XrdProofdProtocol::GetWorkers)
00657             wrks->push_back(*nxWrk);
00658             // Update next worker index
00659             fNextWrk++;
00660             if (fNextWrk >= (int) acws->size()) {
00661                fNextWrk = 1;
00662                iw = 0;
00663                nxWrk = acws->begin();
00664             }
00665          }
00666       }
00667    } else {
00668       // The full list
00669       std::list<XrdProofWorker *>::iterator iw = acws->begin();
00670       iw++;
00671       while (iw != acws->end()) {
00672          // Add to the list (stats are updated in XrdProofdProtocol::GetWorkers)
00673          wrks->push_back(*iw);
00674          iw++;
00675       }
00676    }
00677 
00678    // Make sure that something has been found
00679    if (wrks->size() <= 1) {
00680       TRACE(XERR, "no worker found: do nothing");
00681       rc = -1;
00682    }
00683 
00684    // Cleanup
00685    if (acwseff) { delete acwseff; acwseff = 0; }
00686 
00687    return rc;
00688 }
00689 
00690 //______________________________________________________________________________
00691 int XrdProofSched::Reschedule()
00692 {
00693    // Consider starting some query from the queue.
00694    // to be called after some resources are free (e.g. by a finished query)
00695    // This method is doing the full transaction of finding the session to
00696    // resume, assigning it workers and sending a resume message.
00697    // In this way there is not possibility of interference with other GetWorkers
00698    // return 0 in case of success and -1 in case of an error
00699    XPDDOM(SCHED)
00700 
00701    if (fUseFIFO && TRACING(DBG)) DumpQueues("Reschedule");
00702 
00703    if (!fQueue.empty()) {
00704       // Any advanced scheduling algorithms can be done here
00705 
00706       XrdProofdProofServ *xps = FirstSession();
00707       XrdOucString wrks;
00708       // Call GetWorkers in the manager to mark the assignment.
00709       XrdOucString qtag;
00710       if (xps && xps->CurrentQuery()) {
00711          qtag = xps->CurrentQuery()->GetTag();
00712          if (qtag.beginswith(XPD_GW_Static)) {
00713             qtag = XPD_GW_Static;
00714             qtag.replace(":","");
00715          }
00716       }
00717       if (fMgr->GetWorkers(wrks, xps, qtag.c_str()) < 0 ) {
00718          // Something wrong
00719          return -1;
00720       } else {
00721          // Send buffer
00722          // if workers were assigned remove the session from the queue
00723          if (wrks.length() > 0 && wrks != XPD_GW_QueryEnqueued) {
00724             // Send the resume message: the workers will be send in response to a
00725             // GetWorkers message
00726             xps->Resume();
00727             // Acually remove the session from the queue
00728             fQueue.remove(xps);
00729             // Put the session at the end of the queue
00730             // > 1 because the query is kept in the queue until 2nd GetWorkers
00731             if (xps->Queries()->size() > 1)
00732                fQueue.push_back(xps);
00733             if (TRACING(DBG)) DumpQueues("Reschedule 2");
00734          } // else add workers to the running sessions (once it's possible)
00735 
00736       }
00737 
00738    } //else add workers to the running sessions (once it's possible)
00739 
00740    return 0;
00741 }
00742 
00743 //______________________________________________________________________________
00744 int XrdProofSched::ExportInfo(XrdOucString &sbuf)
00745 {
00746    // Fill sbuf with some info about our current status
00747 
00748    // Selection type
00749    const char *osel[] = { "all", "round-robin", "random", "load-based"};
00750    sbuf += "Selection: ";
00751    sbuf += osel[fWorkerSel+1];
00752    if (fWorkerSel > -1) {
00753       sbuf += ", max workers: ";
00754       sbuf += fWorkerMax; sbuf += " &";
00755    }
00756 
00757    // The full list
00758    std::list<XrdProofWorker *> *acws = fMgr->NetMgr()->GetActiveWorkers();
00759    std::list<XrdProofWorker *>::iterator iw;
00760    for (iw = acws->begin(); iw != acws->end(); ++iw) {
00761       sbuf += (*iw)->fType;
00762       sbuf += ": "; sbuf += (*iw)->fHost;
00763       if ((*iw)->fPort > -1) {
00764          sbuf += ":"; sbuf += (*iw)->fPort;
00765       } else
00766          sbuf += "     ";
00767       sbuf += "  sessions: "; sbuf += (*iw)->Active();
00768       sbuf += " &";
00769    }
00770 
00771    // Done
00772    return 0;
00773 }
00774 
00775 //______________________________________________________________________________
00776 int XrdProofSched::ProcessDirective(XrdProofdDirective *d,
00777                                     char *val, XrdOucStream *cfg, bool rcf)
00778 {
00779    // Update the priorities of the active sessions.
00780    XPDLOC(SCHED, "Sched::ProcessDirective")
00781 
00782    if (!d)
00783       // undefined inputs
00784       return -1;
00785 
00786    if (d->fName == "schedparam") {
00787       return DoDirectiveSchedParam(val, cfg, rcf);
00788    } else if (d->fName == "resource") {
00789       return DoDirectiveResource(val, cfg, rcf);
00790    }
00791    TRACE(XERR, "unknown directive: "<<d->fName);
00792    return -1;
00793 }
00794 
00795 //______________________________________________________________________________
00796 int XrdProofSched::DoDirectiveSchedParam(char *val, XrdOucStream *cfg, bool)
00797 {
00798    // Process 'schedparam' directive
00799    XPDLOC(SCHED, "Sched::DoDirectiveSchedParam")
00800 
00801    if (!val || !cfg)
00802       // undefined inputs
00803       return -1;
00804 
00805    // Get the parameters
00806    while (val && val[0]) {
00807       XrdOucString s(val);
00808       if (s.beginswith("wmx:")) {
00809          s.replace("wmx:","");
00810          fWorkerMax = strtol(s.c_str(), (char **)0, 10);
00811       } else if (s.beginswith("mxsess:")) {
00812          s.replace("mxsess:","");
00813          fMaxSessions = strtol(s.c_str(), (char **)0, 10);
00814       } else if (s.beginswith("mxrun:")) {
00815          s.replace("mxrun:","");
00816          fMaxRunning = strtol(s.c_str(), (char **)0, 10);
00817       } else if (s.beginswith("selopt:")) {
00818          if (s.endswith("random"))
00819             fWorkerSel = kSSORandom;
00820          else if (s.endswith("load"))
00821             fWorkerSel = kSSOLoadBased;
00822          else
00823             fWorkerSel = kSSORoundRobin;
00824       } else if (s.beginswith("fraction:")) {
00825          s.replace("fraction:","");
00826          fNodesFraction = strtod(s.c_str(), (char **)0);
00827       } else if (s.beginswith("optnwrks:")) {
00828          s.replace("optnwrks:","");
00829          fOptWrksPerUnit = strtol(s.c_str(), (char **)0, 10);
00830       } else if (s.beginswith("minforquery:")) {
00831          s.replace("minforquery:","");
00832          fMinForQuery = strtol(s.c_str(), (char **)0, 10);
00833       } else if (s.beginswith("queue:")) {
00834          if (s.endswith("fifo")) {
00835             fUseFIFO = 1;
00836          }
00837       } else if (strncmp(val, "default", 7)) {
00838          // This line applies to another scheduler
00839          ResetParameters();
00840          break;
00841       }
00842       val = cfg->GetWord();
00843    }
00844 
00845    // If the max number of sessions is limited then there is no lower bound
00846    // the number of workers per query
00847    if (fMaxSessions > 0) {
00848       fMinForQuery = 0;
00849       // And there is an upper limit on the number of running sessions
00850       if (fMaxRunning < 0 || fMaxRunning > fMaxSessions)
00851          fMaxRunning = fMaxSessions;
00852    }
00853 
00854    // The FIFO size make sense only in non-load based mode
00855    if (fWorkerSel == kSSOLoadBased && fMaxRunning > 0) {
00856       TRACE(ALL, "WARNING: in Load-Based mode the max number of sessions"
00857                  " to be run is determined dynamically");
00858    }
00859 
00860    return 0;
00861 }
00862 
00863 //______________________________________________________________________________
00864 int XrdProofSched::DoDirectiveResource(char *val, XrdOucStream *cfg, bool)
00865 {
00866    // Process 'resource' directive
00867 
00868    if (!val || !cfg)
00869       // undefined inputs
00870       return -1;
00871 
00872    // Get the scheduler name
00873    if (strncmp(val, "static", 6) && strncmp(val, "default", 7))
00874       return 0;
00875    // Get the values
00876    while ((val = cfg->GetWord()) && val[0]) {
00877       XrdOucString s(val);
00878       if (s.beginswith("wmx:")) {
00879          s.replace("wmx:","");
00880          fWorkerMax = strtol(s.c_str(), (char **)0, 10);
00881       } else if (s.beginswith("mxsess:")) {
00882          s.replace("mxsess:","");
00883          fMaxSessions = strtol(s.c_str(), (char **)0, 10);
00884       } else if (s.beginswith("selopt:")) {
00885          if (s.endswith("random"))
00886             fWorkerSel = kSSORandom;
00887          else
00888             fWorkerSel = kSSORoundRobin;
00889       }
00890    }
00891    return 0;
00892 }

Generated on Tue Jul 5 14:51:52 2011 for ROOT_528-00b_version by  doxygen 1.5.1