XrdProofdPriorityMgr.cxx

Go to the documentation of this file.
00001 // @(#)root/proofd:$Id: XrdProofdPriorityMgr.cxx 29127 2009-06-22 09:07:57Z brun $
00002 // Author: G. Ganis Feb 2008
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // XrdProofdPriorityMgr                                                 //
00015 //                                                                      //
00016 // Author: G. Ganis, CERN, 2007                                         //
00017 //                                                                      //
00018 // Class managing session priorities.                                   //
00019 //                                                                      //
00020 //////////////////////////////////////////////////////////////////////////
00021 #include "XrdProofdPlatform.h"
00022 
00023 #include "XrdOuc/XrdOucStream.hh"
00024 #include "XrdSys/XrdSysPriv.hh"
00025 
00026 #include "XrdProofdAux.h"
00027 #include "XrdProofdManager.h"
00028 #include "XrdProofdPriorityMgr.h"
00029 #include "XrdProofGroup.h"
00030 
00031 // Tracing utilities
00032 #include "XrdProofdTrace.h"
00033 
00034 // Aux structures for scan through operations
00035 typedef struct {
00036    XrdProofGroupMgr *fGroupMgr;
00037    std::list<XrdProofdSessionEntry *> *fSortedList;
00038    bool error;
00039 } XpdCreateActiveList_t;
00040 
00041 //--------------------------------------------------------------------------
00042 //
00043 // XrdProofdPriorityCron
00044 //
00045 // Function run in separate thread watching changes in session status
00046 // frequency
00047 //
00048 //--------------------------------------------------------------------------
00049 void *XrdProofdPriorityCron(void *p)
00050 {
00051    // This is an endless loop to periodically check the system
00052    XPDLOC(PMGR, "PriorityCron")
00053 
00054    XrdProofdPriorityMgr *mgr = (XrdProofdPriorityMgr *)p;
00055    if (!(mgr)) {
00056       TRACE(REQ, "undefined manager: cannot start");
00057       return (void *)0;
00058    }
00059 
00060    while(1) {
00061       // We wait for processes to communicate a session status change
00062       int pollRet = mgr->Pipe()->Poll(-1);
00063       if (pollRet > 0) {
00064          int rc = 0;
00065          XpdMsg msg;
00066          if ((rc = mgr->Pipe()->Recv(msg)) != 0) {
00067             XPDERR("problems receiving message; errno: "<<-rc);
00068             continue;
00069          }
00070          // Parse type
00071          if (msg.Type() == XrdProofdPriorityMgr::kChangeStatus) {
00072             XrdOucString usr, grp;
00073             int opt, pid;
00074             rc = msg.Get(opt);
00075             rc = (rc == 0) ? msg.Get(usr) : rc;
00076             rc = (rc == 0) ? msg.Get(grp) : rc;
00077             rc = (rc == 0) ? msg.Get(pid) : rc;
00078             if (rc != 0) {
00079                XPDERR("kChangeStatus: problems parsing message : '"<<msg.Buf()<<"'; errno: "<<-rc);
00080                continue;
00081             }
00082             if (opt < 0) {
00083                // Remove
00084                mgr->RemoveSession(pid);
00085             } else if (opt > 0) {
00086                // Add
00087                mgr->AddSession(usr.c_str(), grp.c_str(), pid);
00088             } else {
00089                XPDERR("kChangeStatus: invalid opt: "<< opt);
00090             }
00091          } else if (msg.Type() == XrdProofdPriorityMgr::kSetGroupPriority) {
00092             XrdOucString grp;
00093             int prio = -1;
00094             rc = msg.Get(grp);
00095             rc = (rc == 0) ? msg.Get(prio) : rc;
00096             if (rc != 0) {
00097                XPDERR("kSetGroupPriority: problems parsing message; errno: "<<-rc);
00098                continue;
00099             }
00100             // Change group priority
00101             mgr->SetGroupPriority(grp.c_str(), prio);
00102          } else {
00103             XPDERR("unknown message type: "<< msg.Type());
00104          }
00105          // Communicate new priorities
00106          if (mgr->SetNiceValues() != 0) {
00107             XPDERR("problem setting nice values ");
00108          }
00109       }
00110    }
00111 
00112    // Should never come here
00113    return (void *)0;
00114 }
00115 
00116 //______________________________________________________________________________
00117 XrdProofdPriorityMgr::XrdProofdPriorityMgr(XrdProofdManager *mgr,
00118                                            XrdProtocol_Config *pi, XrdSysError *e)
00119                     : XrdProofdConfig(pi->ConfigFN, e)
00120 {
00121    // Constructor
00122    XPDLOC(PMGR, "XrdProofdPriorityMgr")
00123 
00124    fMgr = mgr;
00125    fSchedOpt = kXPD_sched_off;
00126    fPriorityMax = 20;
00127    fPriorityMin = 1;
00128 
00129    // Init pipe for the poller
00130    if (!fPipe.IsValid()) {
00131       TRACE(XERR, "unable to generate pipe for the priority poller");
00132       return;
00133    }
00134 
00135    // Configuration directives
00136    RegisterDirectives();
00137 }
00138 
00139 //__________________________________________________________________________
00140 static int DumpPriorityChanges(const char *, XrdProofdPriority *p, void *s)
00141 {
00142    // Reset the priority on entries
00143    XPDLOC(PMGR, "DumpPriorityChanges")
00144 
00145    XrdSysError *e = (XrdSysError *)s;
00146 
00147    if (p && e) {
00148       XrdOucString msg;
00149       XPDFORM(msg, "priority will be changed by %d for user(s): %s",
00150                    p->fDeltaPriority, p->fUser.c_str());
00151       TRACE(ALL, msg);
00152       // Check next
00153       return 0;
00154    }
00155 
00156    // Not enough info: stop
00157    return 1;
00158 }
00159 
00160 //__________________________________________________________________________
00161 int XrdProofdPriorityMgr::Config(bool rcf)
00162 {
00163    // Run configuration and parse the entered config directives.
00164    // Return 0 on success, -1 on error
00165    XPDLOC(PMGR, "PriorityMgr::Config")
00166 
00167    // Run first the configurator
00168    if (XrdProofdConfig::Config(rcf) != 0) {
00169       XPDERR("problems parsing file ");
00170       return -1;
00171    }
00172 
00173    XrdOucString msg;
00174    msg = (rcf) ? "re-configuring" : "configuring";
00175    TRACE(ALL, msg);
00176 
00177    // Notify change priority rules
00178    if (fPriorities.Num() > 0) {
00179       fPriorities.Apply(DumpPriorityChanges, (void *)fEDest);
00180    } else {
00181       TRACE(ALL, "no priority changes requested");
00182    }
00183 
00184    // Scheduling option
00185    if (fMgr->GroupsMgr() && fMgr->GroupsMgr()->Num() > 1 && fSchedOpt != kXPD_sched_off) {
00186       XPDFORM(msg, "worker sched based on '%s' priorities",
00187                    (fSchedOpt == kXPD_sched_central) ? "central" : "local");
00188       TRACE(ALL, msg);
00189    }
00190 
00191    if (!rcf) {
00192       // Start poller thread
00193       pthread_t tid;
00194       if (XrdSysThread::Run(&tid, XrdProofdPriorityCron,
00195                               (void *)this, 0, "PriorityMgr poller thread") != 0) {
00196          XPDERR("could not start poller thread");
00197          return 0;
00198       }
00199       TRACE(ALL, "poller thread started");
00200    }
00201 
00202    // Done
00203    return 0;
00204 }
00205 
00206 //__________________________________________________________________________
00207 void XrdProofdPriorityMgr::RegisterDirectives()
00208 {
00209    // Register directives for configuration
00210 
00211    Register("schedopt", new XrdProofdDirective("schedopt", this, &DoDirectiveClass));
00212    Register("priority", new XrdProofdDirective("priority", this, &DoDirectiveClass));
00213 }
00214 
00215 //______________________________________________________________________________
00216 int XrdProofdPriorityMgr::DoDirective(XrdProofdDirective *d,
00217                                   char *val, XrdOucStream *cfg, bool rcf)
00218 {
00219    // Update the priorities of the active sessions.
00220    XPDLOC(PMGR, "PriorityMgr::DoDirective")
00221 
00222    if (!d)
00223       // undefined inputs
00224       return -1;
00225 
00226    if (d->fName == "priority") {
00227       return DoDirectivePriority(val, cfg, rcf);
00228    } else if (d->fName == "schedopt") {
00229       return DoDirectiveSchedOpt(val, cfg, rcf);
00230    }
00231    TRACE(XERR, "unknown directive: "<<d->fName);
00232    return -1;
00233 }
00234 
00235 //______________________________________________________________________________
00236 void XrdProofdPriorityMgr::SetGroupPriority(const char *grp, int priority)
00237 {
00238    // Change group priority. Used when a master pushes a priority to a worker.
00239 
00240    XrdProofGroup *g = fMgr->GroupsMgr()->GetGroup(grp);
00241    if (g)
00242       g->SetPriority((float)priority);
00243 
00244    // Make sure scheduling is ON
00245    SetSchedOpt(kXPD_sched_central);
00246 
00247    // Done
00248    return;
00249 }
00250 
00251 //__________________________________________________________________________
00252 static int ResetEntryPriority(const char *, XrdProofdSessionEntry *e, void *)
00253 {
00254    // Reset the priority on entries
00255 
00256    if (e) {
00257       e->SetPriority();
00258       // Check next
00259       return 0;
00260    }
00261 
00262    // Not enough info: stop
00263    return 1;
00264 }
00265 
00266 //__________________________________________________________________________
00267 static int CreateActiveList(const char *, XrdProofdSessionEntry *e, void *s)
00268 {
00269    // Run thorugh entries to create the sorted list of active entries
00270    XPDLOC(PMGR, "CreateActiveList")
00271 
00272    XpdCreateActiveList_t *cal = (XpdCreateActiveList_t *)s;
00273 
00274    XrdOucString emsg;
00275    if (e && cal) {
00276       XrdProofGroupMgr *gm = cal->fGroupMgr;
00277       std::list<XrdProofdSessionEntry *> *sorted = cal->fSortedList;
00278       if (gm) {
00279          XrdProofGroup *g = gm->GetGroup(e->fGroup.c_str());
00280          if (g) {
00281             float ef = g->FracEff() / g->Active();
00282             int nsrv = g->Active(e->fUser.c_str());
00283             if (nsrv > 0) {
00284                ef /= nsrv;
00285                e->fFracEff = ef;
00286                bool pushed = 0;
00287                std::list<XrdProofdSessionEntry *>::iterator ssvi;
00288                for (ssvi = sorted->begin() ; ssvi != sorted->end(); ssvi++) {
00289                   if (ef >= (*ssvi)->fFracEff) {
00290                      sorted->insert(ssvi, e);
00291                      pushed = 1;
00292                      break;
00293                   }
00294                }
00295                if (!pushed)
00296                   sorted->push_back(e);
00297                // Go to next
00298                return 0;
00299 
00300             } else {
00301                emsg = "no srv sessions for active client";
00302             }
00303          } else {
00304             emsg = "group not found: "; emsg += e->fGroup.c_str();
00305          }
00306       } else {
00307          emsg = "group manager undefined";
00308       }
00309    } else {
00310       emsg = "input structure or entry undefined";
00311    }
00312 
00313    // Some problem
00314    if (cal) cal->error = 1;
00315    TRACE(XERR, (e ? e->fUser : "---") << ": protocol error: "<<emsg);
00316    return 1;
00317 }
00318 
00319 //______________________________________________________________________________
00320 int XrdProofdPriorityMgr::SetNiceValues(int opt)
00321 {
00322    // Recalculate nice values taking into account all active users
00323    // and their priorities.
00324    // The type of sessions considered depend on 'opt':
00325    //    0          all active sessions
00326    //    1          master sessions
00327    //    2          worker sessionsg21
00328    // Return 0 on success, -1 otherwise.
00329    XPDLOC(PMGR, "PriorityMgr::SetNiceValues")
00330 
00331    TRACE(REQ, "------------------- Start ----------------------");
00332 
00333    TRACE(REQ, "opt: "<<opt);
00334 
00335    if (!fMgr->GroupsMgr() || fMgr->GroupsMgr()->Num() <= 1 || !IsSchedOn()) {
00336       // Nothing to do
00337       TRACE(REQ, "------------------- End ------------------------");
00338       return 0;
00339    }
00340 
00341    // At least two active session
00342    int nact = fSessions.Num();
00343    TRACE(DBG,  fMgr->GroupsMgr()->Num()<<" groups, " << nact<<" active sessions");
00344    if (nact <= 1) {
00345       // Restore default values
00346       if (nact == 1)
00347          fSessions.Apply(ResetEntryPriority, 0);
00348       // Nothing else to do
00349       TRACE(REQ, "------------------- End ------------------------");
00350       return 0;
00351    }
00352 
00353    XrdSysMutexHelper mtxh(&fMutex);
00354 
00355    // Determine which groups are active and their effective fractions
00356    int rc = 0;
00357    if ((rc = fMgr->GroupsMgr()->SetEffectiveFractions(IsSchedOn())) != 0) {
00358       // Failure
00359       TRACE(XERR, "failure from SetEffectiveFractions");
00360       rc = -1;
00361    }
00362 
00363    // Now create a list of active sessions sorted by decreasing effective fraction
00364    TRACE(DBG,  "creating a list of active sessions sorted by decreasing effective fraction ");
00365    std::list<XrdProofdSessionEntry *> sorted;
00366    XpdCreateActiveList_t cal = { fMgr->GroupsMgr(), &sorted, 0 };
00367    if (rc == 0)
00368       fSessions.Apply(CreateActiveList, (void *)&cal);
00369 
00370    if (!cal.error) {
00371       // Notify
00372       int i = 0;
00373       std::list<XrdProofdSessionEntry *>::iterator ssvi;
00374       if (TRACING(HDBG)) {
00375          for (ssvi = sorted.begin() ; ssvi != sorted.end(); ssvi++)
00376             TRACE(HDBG, i++ <<" eff: "<< (*ssvi)->fFracEff);
00377       }
00378 
00379       TRACE(DBG,  "calculating nice values");
00380 
00381       // The first has the max priority
00382       ssvi = sorted.begin();
00383       float xmax = (*ssvi)->fFracEff;
00384       if (xmax > 0.) {
00385          // This is for Unix
00386          int nice = 20 - fPriorityMax;
00387          (*ssvi)->SetPriority(nice);
00388          // The others have priorities rescaled wrt their effective fractions
00389          ssvi++;
00390          while (ssvi != sorted.end()) {
00391             int xpri = (int) ((*ssvi)->fFracEff / xmax * (fPriorityMax - fPriorityMin))
00392                                                       + fPriorityMin;
00393             nice = 20 - xpri;
00394             TRACE(DBG,  "    --> nice value for client "<< (*ssvi)->fUser<<" is "<<nice);
00395             (*ssvi)->SetPriority(nice);
00396             ssvi++;
00397          }
00398       } else {
00399          TRACE(XERR, "negative or null max effective fraction: "<<xmax);
00400          rc = -1;
00401       }
00402    } else {
00403       TRACE(XERR, "failure from CreateActiveList");
00404       rc = -1;
00405    }
00406    TRACE(REQ, "------------------- End ------------------------");
00407 
00408    // Done
00409    return rc;
00410 }
00411 
00412 //______________________________________________________________________________
00413 int XrdProofdPriorityMgr::DoDirectivePriority(char *val, XrdOucStream *cfg, bool)
00414 {
00415    // Process 'priority' directive
00416 
00417    if (!val || !cfg)
00418       // undefined inputs
00419       return -1;
00420 
00421    // Priority change directive: get delta_priority
00422    int dp = strtol(val,0,10);
00423    XrdProofdPriority *p = new XrdProofdPriority("*", dp);
00424    // Check if an 'if' condition is present
00425    if ((val = cfg->GetWord()) && !strncmp(val,"if",2)) {
00426       if ((val = cfg->GetWord()) && val[0]) {
00427          p->fUser = val;
00428       }
00429    }
00430    // Add to the list
00431    fPriorities.Rep(p->fUser.c_str(), p);
00432    return 0;
00433 }
00434 
00435 //______________________________________________________________________________
00436 int XrdProofdPriorityMgr::DoDirectiveSchedOpt(char *val, XrdOucStream *cfg, bool)
00437 {
00438    // Process 'schedopt' directive
00439    XPDLOC(PMGR, "PriorityMgr::DoDirectiveSchedOpt")
00440 
00441    if (!val || !cfg)
00442       // undefined inputs
00443       return -1;
00444 
00445    int pmin = -1;
00446    int pmax = -1;
00447    int opt = -1;
00448    // Defines scheduling options
00449    while (val && val[0]) {
00450       XrdOucString o = val;
00451       if (o.beginswith("min:")) {
00452          // The overall inflating factor
00453          o.replace("min:","");
00454          sscanf(o.c_str(), "%d", &pmin);
00455       } else if (o.beginswith("max:")) {
00456          // The overall inflating factor
00457          o.replace("max:","");
00458          sscanf(o.c_str(), "%d", &pmax);
00459       } else {
00460          if (o == "central")
00461             opt = kXPD_sched_central;
00462          else if (o == "local")
00463             opt = kXPD_sched_local;
00464       }
00465       // Check deprecated 'if' directive
00466       if (fMgr->Host() && cfg)
00467          if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
00468             return 0;
00469       // Next
00470       val = cfg->GetWord();
00471    }
00472 
00473    // Set the values (we need to do it here to avoid setting wrong values
00474    // when a non-matching 'if' condition is found)
00475    if (pmin > -1)
00476       fPriorityMin = (pmin >= 1 && pmin <= 40) ? pmin : fPriorityMin;
00477    if (pmax > -1)
00478       fPriorityMax = (pmax >= 1 && pmax <= 40) ? pmax : fPriorityMax;
00479    if (opt > -1)
00480       fSchedOpt = opt;
00481 
00482    // Make sure that min is <= max
00483    if (fPriorityMin > fPriorityMax) {
00484       TRACE(XERR, "inconsistent value for fPriorityMin (> fPriorityMax) ["<<
00485                   fPriorityMin << ", "<<fPriorityMax<<"] - correcting");
00486       fPriorityMin = fPriorityMax;
00487    }
00488 
00489    return 0;
00490 }
00491 
00492 //______________________________________________________________________________
00493 int XrdProofdPriorityMgr::RemoveSession(int pid)
00494 {
00495    // Remove from the active list the session with ID pid.
00496    // Return -ENOENT if not found, or 0.
00497 
00498    XrdOucString key; key += pid;
00499    return fSessions.Del(key.c_str());
00500 }
00501 
00502 //______________________________________________________________________________
00503 int XrdProofdPriorityMgr::AddSession(const char *u, const char *g, int pid)
00504 {
00505    // Add to the active list a session with ID pid. Check for duplications.
00506    // Returns 1 if the entry existed already and it has been replaced; or 0.
00507 
00508    int rc = 0;
00509    XrdOucString key; key += pid;
00510    XrdProofdSessionEntry *oldent = fSessions.Find(key.c_str());
00511    if (oldent) {
00512       rc = 1;
00513       fSessions.Rep(key.c_str(), new XrdProofdSessionEntry(u, g, pid));
00514    } else {
00515       fSessions.Add(key.c_str(), new XrdProofdSessionEntry(u, g, pid));
00516    }
00517 
00518    // Done
00519    return rc;
00520 }
00521 
00522 //__________________________________________________________________________
00523 int XrdProofdPriorityMgr::SetProcessPriority(int pid, const char *user, int &dp)
00524 {
00525    // Change priority of process pid belonging to user, if needed.
00526    // Return 0 on success, -errno in case of error
00527    XPDLOC(PMGR, "PriorityMgr::SetProcessPriority")
00528 
00529    // Change child process priority, if required
00530    if (fPriorities.Num() > 0) {
00531       XrdProofdPriority *pu = fPriorities.Find(user);
00532       if (pu) {
00533          dp = pu->fDeltaPriority;
00534          // Change the priority
00535          errno = 0;
00536          int priority = XPPM_NOPRIORITY;
00537          if ((priority = getpriority(PRIO_PROCESS, pid)) == -1 && errno != 0) {
00538             TRACE(XERR, "getpriority: errno: " << errno);
00539             return -errno;
00540          }
00541          // Set the priority
00542          int newp = priority + dp;
00543          XrdProofUI ui;
00544          XrdProofdAux::GetUserInfo(geteuid(), ui);
00545          XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
00546          if (XpdBadPGuard(pGuard, ui.fUid)) {
00547             TRACE(XERR, "could not get privileges");
00548             return -1;
00549          }
00550          TRACE(REQ, "got privileges ");
00551          errno = 0;
00552          if (setpriority(PRIO_PROCESS, pid, newp) != 0) {
00553             TRACE(XERR, "setpriority: errno: " << errno);
00554             return ((errno != 0) ? -errno : -1);
00555          }
00556          if ((getpriority(PRIO_PROCESS, pid)) != newp && errno != 0) {
00557             TRACE(XERR, "did not succeed: errno: " << errno);
00558             return -errno;
00559          }
00560       }
00561    }
00562 
00563    // We are done
00564    return 0;
00565 }
00566 
00567 //
00568 // Small class to describe an active session
00569 //
00570 //______________________________________________________________________________
00571 XrdProofdSessionEntry::XrdProofdSessionEntry(const char *u, const char *g, int pid)
00572                      : fUser(u), fGroup(g), fPid(pid)
00573 {
00574    // Constructor
00575    XPDLOC(PMGR, "XrdProofdSessionEntry")
00576 
00577    fPriority = XPPM_NOPRIORITY;
00578    fDefaultPriority = XPPM_NOPRIORITY;
00579    errno = 0;
00580    int prio = getpriority(PRIO_PROCESS, pid);
00581    if (errno != 0) {
00582       TRACE(XERR, " getpriority: errno: " << errno);
00583       return;
00584    }
00585    fDefaultPriority = prio;
00586 }
00587 
00588 //______________________________________________________________________________
00589 XrdProofdSessionEntry::~XrdProofdSessionEntry()
00590 {
00591    // Destructor
00592 
00593    SetPriority(fDefaultPriority);
00594 }
00595 
00596 //______________________________________________________________________________
00597 int XrdProofdSessionEntry::SetPriority(int priority)
00598 {
00599    // Change process priority
00600    XPDLOC(PMGR, "SessionEntry::SetPriority")
00601 
00602    if (priority != XPPM_NOPRIORITY)
00603       priority = fDefaultPriority;
00604 
00605    if (priority != fPriority) {
00606       // Set priority to the default value
00607       XrdProofUI ui;
00608       XrdProofdAux::GetUserInfo(geteuid(), ui);
00609       XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
00610       if (XpdBadPGuard(pGuard, ui.fUid)) {
00611          TRACE(XERR, "could not get privileges");
00612          return -1;
00613       }
00614       errno = 0;
00615       if (setpriority(PRIO_PROCESS, fPid, priority) != 0) {
00616          TRACE(XERR, "setpriority: errno: " << errno);
00617          return -1;
00618       }
00619       fPriority = priority;
00620    }
00621 
00622    // Done
00623    return 0;
00624 }

Generated on Tue Jul 5 14:51:51 2011 for ROOT_528-00b_version by  doxygen 1.5.1