00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "XrdProofdPlatform.h"
00022
00023 #include "XrdOuc/XrdOucStream.hh"
00024 #include "XrdSys/XrdSysPriv.hh"
00025
00026 #include "XrdProofdAux.h"
00027 #include "XrdProofdManager.h"
00028 #include "XrdProofdPriorityMgr.h"
00029 #include "XrdProofGroup.h"
00030
00031
00032 #include "XrdProofdTrace.h"
00033
00034
00035 typedef struct {
00036 XrdProofGroupMgr *fGroupMgr;
00037 std::list<XrdProofdSessionEntry *> *fSortedList;
00038 bool error;
00039 } XpdCreateActiveList_t;
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049 void *XrdProofdPriorityCron(void *p)
00050 {
00051
00052 XPDLOC(PMGR, "PriorityCron")
00053
00054 XrdProofdPriorityMgr *mgr = (XrdProofdPriorityMgr *)p;
00055 if (!(mgr)) {
00056 TRACE(REQ, "undefined manager: cannot start");
00057 return (void *)0;
00058 }
00059
00060 while(1) {
00061
00062 int pollRet = mgr->Pipe()->Poll(-1);
00063 if (pollRet > 0) {
00064 int rc = 0;
00065 XpdMsg msg;
00066 if ((rc = mgr->Pipe()->Recv(msg)) != 0) {
00067 XPDERR("problems receiving message; errno: "<<-rc);
00068 continue;
00069 }
00070
00071 if (msg.Type() == XrdProofdPriorityMgr::kChangeStatus) {
00072 XrdOucString usr, grp;
00073 int opt, pid;
00074 rc = msg.Get(opt);
00075 rc = (rc == 0) ? msg.Get(usr) : rc;
00076 rc = (rc == 0) ? msg.Get(grp) : rc;
00077 rc = (rc == 0) ? msg.Get(pid) : rc;
00078 if (rc != 0) {
00079 XPDERR("kChangeStatus: problems parsing message : '"<<msg.Buf()<<"'; errno: "<<-rc);
00080 continue;
00081 }
00082 if (opt < 0) {
00083
00084 mgr->RemoveSession(pid);
00085 } else if (opt > 0) {
00086
00087 mgr->AddSession(usr.c_str(), grp.c_str(), pid);
00088 } else {
00089 XPDERR("kChangeStatus: invalid opt: "<< opt);
00090 }
00091 } else if (msg.Type() == XrdProofdPriorityMgr::kSetGroupPriority) {
00092 XrdOucString grp;
00093 int prio = -1;
00094 rc = msg.Get(grp);
00095 rc = (rc == 0) ? msg.Get(prio) : rc;
00096 if (rc != 0) {
00097 XPDERR("kSetGroupPriority: problems parsing message; errno: "<<-rc);
00098 continue;
00099 }
00100
00101 mgr->SetGroupPriority(grp.c_str(), prio);
00102 } else {
00103 XPDERR("unknown message type: "<< msg.Type());
00104 }
00105
00106 if (mgr->SetNiceValues() != 0) {
00107 XPDERR("problem setting nice values ");
00108 }
00109 }
00110 }
00111
00112
00113 return (void *)0;
00114 }
00115
00116
00117 XrdProofdPriorityMgr::XrdProofdPriorityMgr(XrdProofdManager *mgr,
00118 XrdProtocol_Config *pi, XrdSysError *e)
00119 : XrdProofdConfig(pi->ConfigFN, e)
00120 {
00121
00122 XPDLOC(PMGR, "XrdProofdPriorityMgr")
00123
00124 fMgr = mgr;
00125 fSchedOpt = kXPD_sched_off;
00126 fPriorityMax = 20;
00127 fPriorityMin = 1;
00128
00129
00130 if (!fPipe.IsValid()) {
00131 TRACE(XERR, "unable to generate pipe for the priority poller");
00132 return;
00133 }
00134
00135
00136 RegisterDirectives();
00137 }
00138
00139
00140 static int DumpPriorityChanges(const char *, XrdProofdPriority *p, void *s)
00141 {
00142
00143 XPDLOC(PMGR, "DumpPriorityChanges")
00144
00145 XrdSysError *e = (XrdSysError *)s;
00146
00147 if (p && e) {
00148 XrdOucString msg;
00149 XPDFORM(msg, "priority will be changed by %d for user(s): %s",
00150 p->fDeltaPriority, p->fUser.c_str());
00151 TRACE(ALL, msg);
00152
00153 return 0;
00154 }
00155
00156
00157 return 1;
00158 }
00159
00160
00161 int XrdProofdPriorityMgr::Config(bool rcf)
00162 {
00163
00164
00165 XPDLOC(PMGR, "PriorityMgr::Config")
00166
00167
00168 if (XrdProofdConfig::Config(rcf) != 0) {
00169 XPDERR("problems parsing file ");
00170 return -1;
00171 }
00172
00173 XrdOucString msg;
00174 msg = (rcf) ? "re-configuring" : "configuring";
00175 TRACE(ALL, msg);
00176
00177
00178 if (fPriorities.Num() > 0) {
00179 fPriorities.Apply(DumpPriorityChanges, (void *)fEDest);
00180 } else {
00181 TRACE(ALL, "no priority changes requested");
00182 }
00183
00184
00185 if (fMgr->GroupsMgr() && fMgr->GroupsMgr()->Num() > 1 && fSchedOpt != kXPD_sched_off) {
00186 XPDFORM(msg, "worker sched based on '%s' priorities",
00187 (fSchedOpt == kXPD_sched_central) ? "central" : "local");
00188 TRACE(ALL, msg);
00189 }
00190
00191 if (!rcf) {
00192
00193 pthread_t tid;
00194 if (XrdSysThread::Run(&tid, XrdProofdPriorityCron,
00195 (void *)this, 0, "PriorityMgr poller thread") != 0) {
00196 XPDERR("could not start poller thread");
00197 return 0;
00198 }
00199 TRACE(ALL, "poller thread started");
00200 }
00201
00202
00203 return 0;
00204 }
00205
00206
00207 void XrdProofdPriorityMgr::RegisterDirectives()
00208 {
00209
00210
00211 Register("schedopt", new XrdProofdDirective("schedopt", this, &DoDirectiveClass));
00212 Register("priority", new XrdProofdDirective("priority", this, &DoDirectiveClass));
00213 }
00214
00215
00216 int XrdProofdPriorityMgr::DoDirective(XrdProofdDirective *d,
00217 char *val, XrdOucStream *cfg, bool rcf)
00218 {
00219
00220 XPDLOC(PMGR, "PriorityMgr::DoDirective")
00221
00222 if (!d)
00223
00224 return -1;
00225
00226 if (d->fName == "priority") {
00227 return DoDirectivePriority(val, cfg, rcf);
00228 } else if (d->fName == "schedopt") {
00229 return DoDirectiveSchedOpt(val, cfg, rcf);
00230 }
00231 TRACE(XERR, "unknown directive: "<<d->fName);
00232 return -1;
00233 }
00234
00235
00236 void XrdProofdPriorityMgr::SetGroupPriority(const char *grp, int priority)
00237 {
00238
00239
00240 XrdProofGroup *g = fMgr->GroupsMgr()->GetGroup(grp);
00241 if (g)
00242 g->SetPriority((float)priority);
00243
00244
00245 SetSchedOpt(kXPD_sched_central);
00246
00247
00248 return;
00249 }
00250
00251
00252 static int ResetEntryPriority(const char *, XrdProofdSessionEntry *e, void *)
00253 {
00254
00255
00256 if (e) {
00257 e->SetPriority();
00258
00259 return 0;
00260 }
00261
00262
00263 return 1;
00264 }
00265
00266
00267 static int CreateActiveList(const char *, XrdProofdSessionEntry *e, void *s)
00268 {
00269
00270 XPDLOC(PMGR, "CreateActiveList")
00271
00272 XpdCreateActiveList_t *cal = (XpdCreateActiveList_t *)s;
00273
00274 XrdOucString emsg;
00275 if (e && cal) {
00276 XrdProofGroupMgr *gm = cal->fGroupMgr;
00277 std::list<XrdProofdSessionEntry *> *sorted = cal->fSortedList;
00278 if (gm) {
00279 XrdProofGroup *g = gm->GetGroup(e->fGroup.c_str());
00280 if (g) {
00281 float ef = g->FracEff() / g->Active();
00282 int nsrv = g->Active(e->fUser.c_str());
00283 if (nsrv > 0) {
00284 ef /= nsrv;
00285 e->fFracEff = ef;
00286 bool pushed = 0;
00287 std::list<XrdProofdSessionEntry *>::iterator ssvi;
00288 for (ssvi = sorted->begin() ; ssvi != sorted->end(); ssvi++) {
00289 if (ef >= (*ssvi)->fFracEff) {
00290 sorted->insert(ssvi, e);
00291 pushed = 1;
00292 break;
00293 }
00294 }
00295 if (!pushed)
00296 sorted->push_back(e);
00297
00298 return 0;
00299
00300 } else {
00301 emsg = "no srv sessions for active client";
00302 }
00303 } else {
00304 emsg = "group not found: "; emsg += e->fGroup.c_str();
00305 }
00306 } else {
00307 emsg = "group manager undefined";
00308 }
00309 } else {
00310 emsg = "input structure or entry undefined";
00311 }
00312
00313
00314 if (cal) cal->error = 1;
00315 TRACE(XERR, (e ? e->fUser : "---") << ": protocol error: "<<emsg);
00316 return 1;
00317 }
00318
00319
00320 int XrdProofdPriorityMgr::SetNiceValues(int opt)
00321 {
00322
00323
00324
00325
00326
00327
00328
00329 XPDLOC(PMGR, "PriorityMgr::SetNiceValues")
00330
00331 TRACE(REQ, "------------------- Start ----------------------");
00332
00333 TRACE(REQ, "opt: "<<opt);
00334
00335 if (!fMgr->GroupsMgr() || fMgr->GroupsMgr()->Num() <= 1 || !IsSchedOn()) {
00336
00337 TRACE(REQ, "------------------- End ------------------------");
00338 return 0;
00339 }
00340
00341
00342 int nact = fSessions.Num();
00343 TRACE(DBG, fMgr->GroupsMgr()->Num()<<" groups, " << nact<<" active sessions");
00344 if (nact <= 1) {
00345
00346 if (nact == 1)
00347 fSessions.Apply(ResetEntryPriority, 0);
00348
00349 TRACE(REQ, "------------------- End ------------------------");
00350 return 0;
00351 }
00352
00353 XrdSysMutexHelper mtxh(&fMutex);
00354
00355
00356 int rc = 0;
00357 if ((rc = fMgr->GroupsMgr()->SetEffectiveFractions(IsSchedOn())) != 0) {
00358
00359 TRACE(XERR, "failure from SetEffectiveFractions");
00360 rc = -1;
00361 }
00362
00363
00364 TRACE(DBG, "creating a list of active sessions sorted by decreasing effective fraction ");
00365 std::list<XrdProofdSessionEntry *> sorted;
00366 XpdCreateActiveList_t cal = { fMgr->GroupsMgr(), &sorted, 0 };
00367 if (rc == 0)
00368 fSessions.Apply(CreateActiveList, (void *)&cal);
00369
00370 if (!cal.error) {
00371
00372 int i = 0;
00373 std::list<XrdProofdSessionEntry *>::iterator ssvi;
00374 if (TRACING(HDBG)) {
00375 for (ssvi = sorted.begin() ; ssvi != sorted.end(); ssvi++)
00376 TRACE(HDBG, i++ <<" eff: "<< (*ssvi)->fFracEff);
00377 }
00378
00379 TRACE(DBG, "calculating nice values");
00380
00381
00382 ssvi = sorted.begin();
00383 float xmax = (*ssvi)->fFracEff;
00384 if (xmax > 0.) {
00385
00386 int nice = 20 - fPriorityMax;
00387 (*ssvi)->SetPriority(nice);
00388
00389 ssvi++;
00390 while (ssvi != sorted.end()) {
00391 int xpri = (int) ((*ssvi)->fFracEff / xmax * (fPriorityMax - fPriorityMin))
00392 + fPriorityMin;
00393 nice = 20 - xpri;
00394 TRACE(DBG, " --> nice value for client "<< (*ssvi)->fUser<<" is "<<nice);
00395 (*ssvi)->SetPriority(nice);
00396 ssvi++;
00397 }
00398 } else {
00399 TRACE(XERR, "negative or null max effective fraction: "<<xmax);
00400 rc = -1;
00401 }
00402 } else {
00403 TRACE(XERR, "failure from CreateActiveList");
00404 rc = -1;
00405 }
00406 TRACE(REQ, "------------------- End ------------------------");
00407
00408
00409 return rc;
00410 }
00411
00412
00413 int XrdProofdPriorityMgr::DoDirectivePriority(char *val, XrdOucStream *cfg, bool)
00414 {
00415
00416
00417 if (!val || !cfg)
00418
00419 return -1;
00420
00421
00422 int dp = strtol(val,0,10);
00423 XrdProofdPriority *p = new XrdProofdPriority("*", dp);
00424
00425 if ((val = cfg->GetWord()) && !strncmp(val,"if",2)) {
00426 if ((val = cfg->GetWord()) && val[0]) {
00427 p->fUser = val;
00428 }
00429 }
00430
00431 fPriorities.Rep(p->fUser.c_str(), p);
00432 return 0;
00433 }
00434
00435
00436 int XrdProofdPriorityMgr::DoDirectiveSchedOpt(char *val, XrdOucStream *cfg, bool)
00437 {
00438
00439 XPDLOC(PMGR, "PriorityMgr::DoDirectiveSchedOpt")
00440
00441 if (!val || !cfg)
00442
00443 return -1;
00444
00445 int pmin = -1;
00446 int pmax = -1;
00447 int opt = -1;
00448
00449 while (val && val[0]) {
00450 XrdOucString o = val;
00451 if (o.beginswith("min:")) {
00452
00453 o.replace("min:","");
00454 sscanf(o.c_str(), "%d", &pmin);
00455 } else if (o.beginswith("max:")) {
00456
00457 o.replace("max:","");
00458 sscanf(o.c_str(), "%d", &pmax);
00459 } else {
00460 if (o == "central")
00461 opt = kXPD_sched_central;
00462 else if (o == "local")
00463 opt = kXPD_sched_local;
00464 }
00465
00466 if (fMgr->Host() && cfg)
00467 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
00468 return 0;
00469
00470 val = cfg->GetWord();
00471 }
00472
00473
00474
00475 if (pmin > -1)
00476 fPriorityMin = (pmin >= 1 && pmin <= 40) ? pmin : fPriorityMin;
00477 if (pmax > -1)
00478 fPriorityMax = (pmax >= 1 && pmax <= 40) ? pmax : fPriorityMax;
00479 if (opt > -1)
00480 fSchedOpt = opt;
00481
00482
00483 if (fPriorityMin > fPriorityMax) {
00484 TRACE(XERR, "inconsistent value for fPriorityMin (> fPriorityMax) ["<<
00485 fPriorityMin << ", "<<fPriorityMax<<"] - correcting");
00486 fPriorityMin = fPriorityMax;
00487 }
00488
00489 return 0;
00490 }
00491
00492
00493 int XrdProofdPriorityMgr::RemoveSession(int pid)
00494 {
00495
00496
00497
00498 XrdOucString key; key += pid;
00499 return fSessions.Del(key.c_str());
00500 }
00501
00502
00503 int XrdProofdPriorityMgr::AddSession(const char *u, const char *g, int pid)
00504 {
00505
00506
00507
00508 int rc = 0;
00509 XrdOucString key; key += pid;
00510 XrdProofdSessionEntry *oldent = fSessions.Find(key.c_str());
00511 if (oldent) {
00512 rc = 1;
00513 fSessions.Rep(key.c_str(), new XrdProofdSessionEntry(u, g, pid));
00514 } else {
00515 fSessions.Add(key.c_str(), new XrdProofdSessionEntry(u, g, pid));
00516 }
00517
00518
00519 return rc;
00520 }
00521
00522
00523 int XrdProofdPriorityMgr::SetProcessPriority(int pid, const char *user, int &dp)
00524 {
00525
00526
00527 XPDLOC(PMGR, "PriorityMgr::SetProcessPriority")
00528
00529
00530 if (fPriorities.Num() > 0) {
00531 XrdProofdPriority *pu = fPriorities.Find(user);
00532 if (pu) {
00533 dp = pu->fDeltaPriority;
00534
00535 errno = 0;
00536 int priority = XPPM_NOPRIORITY;
00537 if ((priority = getpriority(PRIO_PROCESS, pid)) == -1 && errno != 0) {
00538 TRACE(XERR, "getpriority: errno: " << errno);
00539 return -errno;
00540 }
00541
00542 int newp = priority + dp;
00543 XrdProofUI ui;
00544 XrdProofdAux::GetUserInfo(geteuid(), ui);
00545 XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
00546 if (XpdBadPGuard(pGuard, ui.fUid)) {
00547 TRACE(XERR, "could not get privileges");
00548 return -1;
00549 }
00550 TRACE(REQ, "got privileges ");
00551 errno = 0;
00552 if (setpriority(PRIO_PROCESS, pid, newp) != 0) {
00553 TRACE(XERR, "setpriority: errno: " << errno);
00554 return ((errno != 0) ? -errno : -1);
00555 }
00556 if ((getpriority(PRIO_PROCESS, pid)) != newp && errno != 0) {
00557 TRACE(XERR, "did not succeed: errno: " << errno);
00558 return -errno;
00559 }
00560 }
00561 }
00562
00563
00564 return 0;
00565 }
00566
00567
00568
00569
00570
00571 XrdProofdSessionEntry::XrdProofdSessionEntry(const char *u, const char *g, int pid)
00572 : fUser(u), fGroup(g), fPid(pid)
00573 {
00574
00575 XPDLOC(PMGR, "XrdProofdSessionEntry")
00576
00577 fPriority = XPPM_NOPRIORITY;
00578 fDefaultPriority = XPPM_NOPRIORITY;
00579 errno = 0;
00580 int prio = getpriority(PRIO_PROCESS, pid);
00581 if (errno != 0) {
00582 TRACE(XERR, " getpriority: errno: " << errno);
00583 return;
00584 }
00585 fDefaultPriority = prio;
00586 }
00587
00588
00589 XrdProofdSessionEntry::~XrdProofdSessionEntry()
00590 {
00591
00592
00593 SetPriority(fDefaultPriority);
00594 }
00595
00596
00597 int XrdProofdSessionEntry::SetPriority(int priority)
00598 {
00599
00600 XPDLOC(PMGR, "SessionEntry::SetPriority")
00601
00602 if (priority != XPPM_NOPRIORITY)
00603 priority = fDefaultPriority;
00604
00605 if (priority != fPriority) {
00606
00607 XrdProofUI ui;
00608 XrdProofdAux::GetUserInfo(geteuid(), ui);
00609 XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
00610 if (XpdBadPGuard(pGuard, ui.fUid)) {
00611 TRACE(XERR, "could not get privileges");
00612 return -1;
00613 }
00614 errno = 0;
00615 if (setpriority(PRIO_PROCESS, fPid, priority) != 0) {
00616 TRACE(XERR, "setpriority: errno: " << errno);
00617 return -1;
00618 }
00619 fPriority = priority;
00620 }
00621
00622
00623 return 0;
00624 }