00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <list>
00040
00041 #include "XProofProtocol.h"
00042 #include "XrdProofdManager.h"
00043 #include "XrdProofdNetMgr.h"
00044 #include "XrdProofdProofServMgr.h"
00045 #include "XrdProofGroup.h"
00046 #include "XrdProofSched.h"
00047 #include "XrdProofdProofServ.h"
00048 #include "XrdProofWorker.h"
00049
00050 #include "XrdOuc/XrdOucString.hh"
00051 #include "XrdOuc/XrdOucStream.hh"
00052 #ifdef OLDXRDOUC
00053 # include "XrdSysToOuc.h"
00054 # include "XrdOuc/XrdOucError.hh"
00055 #else
00056 # include "XrdSys/XrdSysError.hh"
00057 #endif
00058
00059
00060 #include "XrdProofdTrace.h"
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089 void *XrdProofSchedCron(void *p)
00090 {
00091
00092
00093 XPDLOC(SCHED, "SchedCron")
00094
00095 XrdProofSched *sched = (XrdProofSched *)p;
00096 if (!(sched)) {
00097 TRACE(XERR, "undefined scheduler: cannot start");
00098 return (void *)0;
00099 }
00100
00101
00102 int lastcheck = time(0), ckfreq = sched->CheckFrequency(), deltat = 0;
00103 while(1) {
00104
00105 if ((deltat = ckfreq - (time(0) - lastcheck)) <= 0)
00106 deltat = ckfreq;
00107 int pollRet = sched->Pipe()->Poll(deltat);
00108
00109 if (pollRet > 0) {
00110
00111 XpdMsg msg;
00112 int rc = 0;
00113 if ((rc = sched->Pipe()->Recv(msg)) != 0) {
00114 XPDERR("problems receiving message; errno: "<<-rc);
00115 continue;
00116 }
00117
00118 XrdOucString buf;
00119 if (msg.Type() == XrdProofSched::kReschedule) {
00120
00121 TRACE(ALL, "received kReschedule");
00122
00123
00124 sched->Reschedule();
00125
00126 } else {
00127
00128 TRACE(XERR, "unknown type: "<<msg.Type());
00129 continue;
00130 }
00131 } else {
00132
00133 TRACE(ALL, "running regular checks");
00134
00135 sched->Reschedule();
00136
00137 lastcheck = time(0);
00138 }
00139 }
00140
00141
00142 return (void *)0;
00143 }
00144
00145
00146 static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
00147 {
00148
00149
00150 return ((lhs && rhs &&
00151 lhs->GetNActiveSessions() < rhs->GetNActiveSessions()) ? 1 : 0);
00152 }
00153
00154
00155 int DoSchedDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
00156 {
00157
00158
00159 if (!d || !(d->fVal))
00160
00161 return -1;
00162
00163 return ((XrdProofSched *)d->fVal)->ProcessDirective(d, val, cfg, rcf);
00164 }
00165
00166
00167 XrdProofSched::XrdProofSched(const char *name,
00168 XrdProofdManager *mgr, XrdProofGroupMgr *grpmgr,
00169 const char *cfn, XrdSysError *e)
00170 : XrdProofdConfig(cfn, e)
00171 {
00172
00173
00174 fValid = 1;
00175 fMgr = mgr;
00176 fGrpMgr = grpmgr;
00177 fNextWrk = 1;
00178 fEDest = e;
00179 fUseFIFO = 0;
00180 ResetParameters();
00181
00182 memset(fName, 0, kXPSMXNMLEN);
00183 if (name)
00184 memcpy(fName, name, kXPSMXNMLEN-1);
00185
00186
00187 RegisterDirectives();
00188 }
00189
00190
00191 void XrdProofSched::RegisterDirectives()
00192 {
00193
00194
00195 Register("schedparam", new XrdProofdDirective("schedparam", this, &DoDirectiveClass));
00196 Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
00197 }
00198
00199
00200 int XrdProofSched::DoDirective(XrdProofdDirective *d,
00201 char *val, XrdOucStream *cfg, bool rcf)
00202 {
00203
00204 XPDLOC(SCHED, "Sched::DoDirective")
00205
00206 if (!d)
00207
00208 return -1;
00209
00210 if (d->fName == "schedparam") {
00211 return DoDirectiveSchedParam(val, cfg, rcf);
00212 } else if (d->fName == "resource") {
00213 return DoDirectiveResource(val, cfg, rcf);
00214 }
00215 TRACE(XERR,"unknown directive: "<<d->fName);
00216 return -1;
00217 }
00218
00219
00220
00221 void XrdProofSched::ResetParameters()
00222 {
00223
00224
00225 fMaxSessions = -1;
00226 fMaxRunning = -1;
00227 fWorkerMax = -1;
00228 fWorkerSel = kSSORoundRobin;
00229 fOptWrksPerUnit = 1;
00230 fMinForQuery = 0;
00231 fNodesFraction = 0.5;
00232 fCheckFrequency = 30;
00233 }
00234
00235
00236 int XrdProofSched::Config(bool rcf)
00237 {
00238
00239
00240
00241 XPDLOC(SCHED, "Sched::Config")
00242
00243
00244 if (XrdProofdConfig::Config(rcf) != 0) {
00245 XPDERR("problems parsing file ");
00246 fValid = 0;
00247 return -1;
00248 }
00249
00250 int rc = 0;
00251
00252 XrdOucString msg;
00253
00254
00255 XPDFORM(msg, "maxsess: %d, maxrun: %d, maxwrks: %d, selopt: %d, fifo:%d",
00256 fMaxSessions, fMaxRunning, fWorkerMax, fWorkerSel, fUseFIFO);
00257 TRACE(DBG, msg);
00258
00259 if (!rcf) {
00260
00261 pthread_t tid;
00262 if (XrdSysThread::Run(&tid, XrdProofSchedCron,
00263 (void *)this, 0, "Scheduler cron thread") != 0) {
00264 XPDERR("could not start cron thread");
00265 fValid = 0;
00266 return 0;
00267 }
00268 TRACE(ALL, "cron thread started");
00269 }
00270
00271
00272 return rc;
00273 }
00274
00275
00276 int XrdProofSched::Enqueue(XrdProofdProofServ *xps, XrdProofQuery *query)
00277 {
00278
00279
00280 XPDDOM(SCHED)
00281
00282 if (xps->Enqueue(query) == 1) {
00283 std::list<XrdProofdProofServ *>::iterator ii;
00284 for (ii = fQueue.begin(); ii != fQueue.end(); ii++) {
00285 if ((*ii)->Status() == kXPD_running) break;
00286 }
00287 if (ii != fQueue.end()) {
00288 fQueue.insert(ii, xps);
00289 } else {
00290 fQueue.push_back(xps);
00291 }
00292 }
00293 if (TRACING(DBG)) DumpQueues("Enqueue");
00294
00295 return 0;
00296 }
00297
00298
00299 void XrdProofSched::DumpQueues(const char *prefix)
00300 {
00301
00302
00303 XPDLOC(SCHED, "DumpQueues")
00304
00305 TRACE(ALL," ++++++++++++++++++++ DumpQueues ++++++++++++++++++++++++++++++++ ");
00306 if (prefix) TRACE(ALL, " +++ Called from: "<<prefix);
00307 TRACE(ALL," +++ # of waiting sessions: "<<fQueue.size());
00308 std::list<XrdProofdProofServ *>::iterator ii;
00309 int i = 0;
00310 for (ii = fQueue.begin(); ii != fQueue.end(); ii++) {
00311 TRACE(ALL," +++ #"<<++i<<" client:"<< (*ii)->Client()<<" # of queries: "<< (*ii)->Queries()->size());
00312 }
00313 TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
00314
00315 return;
00316 }
00317
00318
00319 XrdProofdProofServ *XrdProofSched::FirstSession()
00320 {
00321
00322
00323 XPDDOM(SCHED)
00324
00325 if (fQueue.empty())
00326 return 0;
00327 XrdProofdProofServ *xps = fQueue.front();
00328 while (xps && !(xps->IsValid())) {
00329 fQueue.remove(xps);
00330 xps = fQueue.front();
00331 }
00332 if (TRACING(DBG)) DumpQueues("FirstSession");
00333
00334 return xps;
00335 }
00336
00337
00338 int XrdProofSched::GetNumWorkers(XrdProofdProofServ *xps)
00339 {
00340
00341 XPDLOC(SCHED, "Sched::GetNumWorkers")
00342
00343
00344 int nFreeCPUs = 0;
00345 std::list<XrdProofWorker *> *wrks = fMgr->NetMgr()->GetActiveWorkers();
00346 std::list<XrdProofWorker *>::iterator iter;
00347 for (iter = wrks->begin(); iter != wrks->end(); ++iter) {
00348 TRACE(DBG, (*iter)->fImage<<" : # act: "<<(*iter)->fProofServs.size());
00349 if ((*iter)->fType != 'M' && (*iter)->fType != 'S'
00350 && (int) (*iter)->fProofServs.size() < fOptWrksPerUnit)
00351
00352 nFreeCPUs += fOptWrksPerUnit - (*iter)->fProofServs.size();
00353 }
00354
00355 float priority = 1;
00356 XrdProofGroup *grp = 0;
00357 if (fGrpMgr && xps->Group())
00358 grp = fGrpMgr->GetGroup(xps->Group());
00359 if (grp) {
00360 std::list<XrdProofdProofServ *> *sessions = fMgr->SessionMgr()->ActiveSessions();
00361 std::list<XrdProofdProofServ *>::iterator sesIter;
00362 float summedPriority = 0;
00363 for (sesIter = sessions->begin(); sesIter != sessions->end(); ++sesIter) {
00364 if ((*sesIter)->Group()) {
00365 XrdProofGroup *g = fGrpMgr->GetGroup((*sesIter)->Group());
00366 if (g)
00367 summedPriority += g->Priority();
00368 }
00369 }
00370 if (summedPriority > 0)
00371 priority = (grp->Priority() * sessions->size()) / summedPriority;
00372 }
00373
00374 int nWrks = (int)(nFreeCPUs * fNodesFraction * priority);
00375 if (nWrks <= fMinForQuery) {
00376 nWrks = fMinForQuery;
00377 } else if (nWrks >= (int) wrks->size()) {
00378 nWrks = wrks->size() - 1;
00379 }
00380 TRACE(DBG, nFreeCPUs<<" : "<< nWrks);
00381
00382 return nWrks;
00383 }
00384
00385
00386 int XrdProofSched::GetWorkers(XrdProofdProofServ *xps,
00387 std::list<XrdProofWorker *> *wrks,
00388 const char *querytag)
00389 {
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399 XPDLOC(SCHED, "Sched::GetWorkers")
00400
00401 int rc = 0;
00402
00403 TRACE(REQ, "enter: query tag: "<< ((querytag) ? querytag : ""));
00404
00405
00406 bool isDynamic = 1;
00407 if (querytag && !strncmp(querytag, XPD_GW_Static, strlen(XPD_GW_Static) - 1)) {
00408 isDynamic = 0;
00409 }
00410
00411
00412 if (querytag && xps && xps->Workers()->Num() > 0) {
00413 if (TRACING(REQ)) xps->DumpQueries();
00414 const char *cqtag = (xps->CurrentQuery()) ? xps->CurrentQuery()->GetTag() : "undef";
00415 TRACE(REQ, "current query tag: "<< cqtag );
00416 if (!strcmp(querytag, cqtag)) {
00417
00418 xps->RemoveQuery(cqtag);
00419 TRACE(REQ, "current assignment for session "<< xps->SrvPID() << " is valid");
00420
00421 return 1;
00422 }
00423 }
00424
00425
00426 if (!wrks)
00427 return -1;
00428
00429
00430
00431
00432 if (isDynamic) {
00433 if (fUseFIFO && xps->Workers()->Num() > 0) {
00434 if (!xps->GetQuery(querytag))
00435 Enqueue(xps, new XrdProofQuery(querytag));
00436 if (TRACING(DBG)) xps->DumpQueries();
00437
00438 TRACE(REQ, "session has already assigned workers: enqueue");
00439 return 2;
00440 }
00441 }
00442
00443
00444 std::list<XrdProofWorker *> *acws = 0;
00445
00446 if (!fMgr || !(acws = fMgr->NetMgr()->GetActiveWorkers()))
00447 return -1;
00448
00449
00450 XrdProofWorker *mst = acws->front();
00451 if (!mst)
00452 return -1;
00453
00454 if (fWorkerSel == kSSOLoadBased) {
00455
00456
00457
00458
00459 XrdProofWorker::Sort(acws, XpdWrkComp);
00460
00461
00462 int nw = GetNumWorkers(xps);
00463
00464 if (nw > 0) {
00465
00466 wrks->push_back(mst);
00467
00468 std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
00469 while (nw--) {
00470 nxWrk++;
00471
00472
00473 wrks->push_back(*nxWrk);
00474 }
00475 } else {
00476
00477
00478 if (fUseFIFO) {
00479
00480
00481 if (!xps->GetQuery(querytag))
00482 Enqueue(xps, new XrdProofQuery(querytag));
00483 if (TRACING(DBG)) xps->DumpQueries();
00484
00485 TRACE(REQ, "no workers currently available: session enqueued");
00486 return 2;
00487 } else {
00488
00489 wrks->push_back(mst);
00490 }
00491 }
00492
00493 return 0;
00494 }
00495
00496
00497
00498 std::list<XrdProofWorker *> *acwseff = 0;
00499 int maxnum = (querytag && strcmp(querytag, XPD_GW_Static)) ? fMaxRunning : fMaxSessions;
00500 bool ok = 0;
00501 if (isDynamic) {
00502 if (maxnum > 0) {
00503 acwseff = new std::list<XrdProofWorker *>;
00504 std::list<XrdProofWorker *>::iterator xWrk = acws->begin();
00505 if ((*xWrk)->Active() < maxnum) {
00506 acwseff->push_back(*xWrk);
00507 xWrk++;
00508 for (; xWrk != acws->end(); xWrk++) {
00509 if ((*xWrk)->Active() < maxnum) {
00510 acwseff->push_back(*xWrk);
00511 ok = 1;
00512 }
00513 }
00514 } else if (!fUseFIFO) {
00515 TRACE(REQ, "max number of sessions reached - ("<< maxnum <<")");
00516 }
00517
00518 if (!ok) { delete acwseff; acwseff = 0; }
00519 acws = acwseff;
00520 }
00521 } else {
00522 if (maxnum > 0) {
00523
00524
00525 int nactsess = mst->GetNActiveSessions();
00526 TRACE(REQ, "act sess ... " << nactsess);
00527 if (nactsess < maxnum) {
00528 ok = 1;
00529 } else if (!fUseFIFO) {
00530 TRACE(REQ, "max number of sessions reached - ("<< maxnum <<")");
00531 }
00532
00533 if (!ok) acws = acwseff;
00534 }
00535 }
00536
00537
00538 if (!acws || acws->size() <= 1) {
00539 if (fUseFIFO) {
00540
00541
00542 if (!xps->GetQuery(querytag))
00543 Enqueue(xps, new XrdProofQuery(querytag));
00544 if (TRACING(REQ)) xps->DumpQueries();
00545
00546 TRACE(REQ, "no workers currently available: session enqueued");
00547 return 2;
00548 } else {
00549 TRACE(XERR, "no worker available: do nothing");
00550 if (acwseff) { delete acwseff; acwseff = 0; }
00551 return -1;
00552 }
00553 }
00554
00555
00556 if (xps->Workers()->Num() > 0) {
00557
00558 return 1;
00559 }
00560
00561
00562 wrks->push_back(mst);
00563
00564 if (fWorkerMax > 0 && fWorkerMax < (int) acws->size()) {
00565
00566
00567 if (fWorkerSel == kSSORandom) {
00568
00569 static bool rndmInit = 0;
00570 if (!rndmInit) {
00571 const char *randdev = "/dev/urandom";
00572 int fd;
00573 unsigned int seed;
00574 if ((fd = open(randdev, O_RDONLY)) != -1) {
00575 if (read(fd, &seed, sizeof(seed)) != sizeof(seed)) {
00576 TRACE(XERR, "problems reading seed; errno: "<< errno);
00577 }
00578 srand(seed);
00579 close(fd);
00580 rndmInit = 1;
00581 }
00582 }
00583
00584 int nwt = acws->size();
00585 std::vector<int> walloc(nwt, 0);
00586 std::vector<XrdProofWorker *> vwrk(nwt);
00587
00588
00589 int namx = -1;
00590 int i = 1;
00591 std::list<XrdProofWorker *>::iterator iwk = acws->begin();
00592 iwk++;
00593 for ( ; iwk != acws->end(); iwk++) {
00594 vwrk[i] = *iwk;
00595 int na = (*iwk)->Active();
00596 printf(" %d", na);
00597 walloc[i] = na + walloc[i-1];
00598 i++;
00599 namx = (na > namx) ? na : namx;
00600 }
00601 printf("\n");
00602
00603 for (i = 1; i < nwt; i++) {
00604 if (namx > 0)
00605 walloc[i] = namx*i - walloc[i] + i;
00606 else
00607 walloc[i] = i;
00608 }
00609 int natot = walloc[nwt - 1];
00610
00611 int nw = fWorkerMax;
00612 while (nw--) {
00613
00614 int maxAtt = 10000, natt = 0;
00615 int iw = -1;
00616 while ((iw < 1 || iw >= nwt) && natt < maxAtt) {
00617 int jw = rand() % natot;
00618 for (i = 0; i < nwt; i++) {
00619 if (jw < walloc[i]) {
00620
00621 int j = 0;
00622 for (j = i; j < nwt; j++) {
00623 if (walloc[j] > 0)
00624 walloc[j]--;
00625 }
00626 natot--;
00627 iw = i;
00628 break;
00629 }
00630 }
00631 }
00632
00633 if (iw > -1) {
00634
00635 wrks->push_back(vwrk[iw]);
00636 } else {
00637
00638 TRACE(XERR, "random generation failed");
00639 rc = -1;
00640 break;
00641 }
00642 }
00643
00644 } else {
00645 if (fNextWrk >= (int) acws->size())
00646 fNextWrk = 1;
00647 int iw = 0;
00648 std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
00649 int nw = fWorkerMax;
00650 while (nw--) {
00651 while (iw != fNextWrk) {
00652 nxWrk++;
00653 iw++;
00654 }
00655
00656
00657 wrks->push_back(*nxWrk);
00658
00659 fNextWrk++;
00660 if (fNextWrk >= (int) acws->size()) {
00661 fNextWrk = 1;
00662 iw = 0;
00663 nxWrk = acws->begin();
00664 }
00665 }
00666 }
00667 } else {
00668
00669 std::list<XrdProofWorker *>::iterator iw = acws->begin();
00670 iw++;
00671 while (iw != acws->end()) {
00672
00673 wrks->push_back(*iw);
00674 iw++;
00675 }
00676 }
00677
00678
00679 if (wrks->size() <= 1) {
00680 TRACE(XERR, "no worker found: do nothing");
00681 rc = -1;
00682 }
00683
00684
00685 if (acwseff) { delete acwseff; acwseff = 0; }
00686
00687 return rc;
00688 }
00689
00690
00691 int XrdProofSched::Reschedule()
00692 {
00693
00694
00695
00696
00697
00698
00699 XPDDOM(SCHED)
00700
00701 if (fUseFIFO && TRACING(DBG)) DumpQueues("Reschedule");
00702
00703 if (!fQueue.empty()) {
00704
00705
00706 XrdProofdProofServ *xps = FirstSession();
00707 XrdOucString wrks;
00708
00709 XrdOucString qtag;
00710 if (xps && xps->CurrentQuery()) {
00711 qtag = xps->CurrentQuery()->GetTag();
00712 if (qtag.beginswith(XPD_GW_Static)) {
00713 qtag = XPD_GW_Static;
00714 qtag.replace(":","");
00715 }
00716 }
00717 if (fMgr->GetWorkers(wrks, xps, qtag.c_str()) < 0 ) {
00718
00719 return -1;
00720 } else {
00721
00722
00723 if (wrks.length() > 0 && wrks != XPD_GW_QueryEnqueued) {
00724
00725
00726 xps->Resume();
00727
00728 fQueue.remove(xps);
00729
00730
00731 if (xps->Queries()->size() > 1)
00732 fQueue.push_back(xps);
00733 if (TRACING(DBG)) DumpQueues("Reschedule 2");
00734 }
00735
00736 }
00737
00738 }
00739
00740 return 0;
00741 }
00742
00743
00744 int XrdProofSched::ExportInfo(XrdOucString &sbuf)
00745 {
00746
00747
00748
00749 const char *osel[] = { "all", "round-robin", "random", "load-based"};
00750 sbuf += "Selection: ";
00751 sbuf += osel[fWorkerSel+1];
00752 if (fWorkerSel > -1) {
00753 sbuf += ", max workers: ";
00754 sbuf += fWorkerMax; sbuf += " &";
00755 }
00756
00757
00758 std::list<XrdProofWorker *> *acws = fMgr->NetMgr()->GetActiveWorkers();
00759 std::list<XrdProofWorker *>::iterator iw;
00760 for (iw = acws->begin(); iw != acws->end(); ++iw) {
00761 sbuf += (*iw)->fType;
00762 sbuf += ": "; sbuf += (*iw)->fHost;
00763 if ((*iw)->fPort > -1) {
00764 sbuf += ":"; sbuf += (*iw)->fPort;
00765 } else
00766 sbuf += " ";
00767 sbuf += " sessions: "; sbuf += (*iw)->Active();
00768 sbuf += " &";
00769 }
00770
00771
00772 return 0;
00773 }
00774
00775
00776 int XrdProofSched::ProcessDirective(XrdProofdDirective *d,
00777 char *val, XrdOucStream *cfg, bool rcf)
00778 {
00779
00780 XPDLOC(SCHED, "Sched::ProcessDirective")
00781
00782 if (!d)
00783
00784 return -1;
00785
00786 if (d->fName == "schedparam") {
00787 return DoDirectiveSchedParam(val, cfg, rcf);
00788 } else if (d->fName == "resource") {
00789 return DoDirectiveResource(val, cfg, rcf);
00790 }
00791 TRACE(XERR, "unknown directive: "<<d->fName);
00792 return -1;
00793 }
00794
00795
00796 int XrdProofSched::DoDirectiveSchedParam(char *val, XrdOucStream *cfg, bool)
00797 {
00798
00799 XPDLOC(SCHED, "Sched::DoDirectiveSchedParam")
00800
00801 if (!val || !cfg)
00802
00803 return -1;
00804
00805
00806 while (val && val[0]) {
00807 XrdOucString s(val);
00808 if (s.beginswith("wmx:")) {
00809 s.replace("wmx:","");
00810 fWorkerMax = strtol(s.c_str(), (char **)0, 10);
00811 } else if (s.beginswith("mxsess:")) {
00812 s.replace("mxsess:","");
00813 fMaxSessions = strtol(s.c_str(), (char **)0, 10);
00814 } else if (s.beginswith("mxrun:")) {
00815 s.replace("mxrun:","");
00816 fMaxRunning = strtol(s.c_str(), (char **)0, 10);
00817 } else if (s.beginswith("selopt:")) {
00818 if (s.endswith("random"))
00819 fWorkerSel = kSSORandom;
00820 else if (s.endswith("load"))
00821 fWorkerSel = kSSOLoadBased;
00822 else
00823 fWorkerSel = kSSORoundRobin;
00824 } else if (s.beginswith("fraction:")) {
00825 s.replace("fraction:","");
00826 fNodesFraction = strtod(s.c_str(), (char **)0);
00827 } else if (s.beginswith("optnwrks:")) {
00828 s.replace("optnwrks:","");
00829 fOptWrksPerUnit = strtol(s.c_str(), (char **)0, 10);
00830 } else if (s.beginswith("minforquery:")) {
00831 s.replace("minforquery:","");
00832 fMinForQuery = strtol(s.c_str(), (char **)0, 10);
00833 } else if (s.beginswith("queue:")) {
00834 if (s.endswith("fifo")) {
00835 fUseFIFO = 1;
00836 }
00837 } else if (strncmp(val, "default", 7)) {
00838
00839 ResetParameters();
00840 break;
00841 }
00842 val = cfg->GetWord();
00843 }
00844
00845
00846
00847 if (fMaxSessions > 0) {
00848 fMinForQuery = 0;
00849
00850 if (fMaxRunning < 0 || fMaxRunning > fMaxSessions)
00851 fMaxRunning = fMaxSessions;
00852 }
00853
00854
00855 if (fWorkerSel == kSSOLoadBased && fMaxRunning > 0) {
00856 TRACE(ALL, "WARNING: in Load-Based mode the max number of sessions"
00857 " to be run is determined dynamically");
00858 }
00859
00860 return 0;
00861 }
00862
00863
00864 int XrdProofSched::DoDirectiveResource(char *val, XrdOucStream *cfg, bool)
00865 {
00866
00867
00868 if (!val || !cfg)
00869
00870 return -1;
00871
00872
00873 if (strncmp(val, "static", 6) && strncmp(val, "default", 7))
00874 return 0;
00875
00876 while ((val = cfg->GetWord()) && val[0]) {
00877 XrdOucString s(val);
00878 if (s.beginswith("wmx:")) {
00879 s.replace("wmx:","");
00880 fWorkerMax = strtol(s.c_str(), (char **)0, 10);
00881 } else if (s.beginswith("mxsess:")) {
00882 s.replace("mxsess:","");
00883 fMaxSessions = strtol(s.c_str(), (char **)0, 10);
00884 } else if (s.beginswith("selopt:")) {
00885 if (s.endswith("random"))
00886 fWorkerSel = kSSORandom;
00887 else
00888 fWorkerSel = kSSORoundRobin;
00889 }
00890 }
00891 return 0;
00892 }