00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "XrdProofdPlatform.h"
00022
00023 #ifdef OLDXRDOUC
00024 # include "XrdOuc/XrdOucError.hh"
00025 # include "XrdOuc/XrdOucLogger.hh"
00026 #else
00027 # include "XrdSys/XrdSysError.hh"
00028 # include "XrdSys/XrdSysLogger.hh"
00029 #endif
00030
00031 #include "Xrd/XrdBuffer.hh"
00032 #include "Xrd/XrdPoll.hh"
00033 #include "Xrd/XrdScheduler.hh"
00034 #include "XrdNet/XrdNet.hh"
00035 #include "XrdNet/XrdNetDNS.hh"
00036 #include "XrdNet/XrdNetPeer.hh"
00037 #include "XrdOuc/XrdOucRash.hh"
00038 #include "XrdOuc/XrdOucStream.hh"
00039 #include "XrdSys/XrdSysPriv.hh"
00040 #include "XrdSut/XrdSutAux.hh"
00041
00042 #include "XrdProofdClient.h"
00043 #include "XrdProofdClientMgr.h"
00044 #include "XrdProofdManager.h"
00045 #include "XrdProofdNetMgr.h"
00046 #include "XrdProofdPriorityMgr.h"
00047 #include "XrdProofdProofServMgr.h"
00048 #include "XrdProofdProtocol.h"
00049 #include "XrdProofGroup.h"
00050 #include "XrdProofSched.h"
00051 #include "XrdROOT.h"
00052
00053 #include <map>
00054
00055
00056 typedef struct {
00057 XrdProofGroupMgr *fGroupMgr;
00058 int *fNBroadcast;
00059 } XpdBroadcastPriority_t;
00060 typedef struct {
00061 XrdProofdManager *fMgr;
00062 XrdProofdClient *fClient;
00063 FILE *fEnv;
00064 } XpdWriteEnv_t;
00065
00066
00067 #include "XrdProofdTrace.h"
00068
00069 static XpdManagerCron_t fManagerCron;
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079 void *XrdProofdProofServCron(void *p)
00080 {
00081
00082
00083 XPDLOC(SMGR, "ProofServCron")
00084
00085 XpdManagerCron_t *mc = (XpdManagerCron_t *)p;
00086 XrdProofdProofServMgr *mgr = mc->fSessionMgr;
00087 XrdProofSched *sched = mc->fProofSched;
00088 if (!(mgr)) {
00089 TRACE(XERR, "undefined session manager: cannot start");
00090 return (void *)0;
00091 }
00092
00093
00094
00095 int quickcheckfreq = 5;
00096 int clnlostscale = 0;
00097
00098
00099 int lastrun = time(0);
00100 int lastcheck = lastrun, ckfreq = mgr->CheckFrequency(), waitt = 0;
00101 int deltat = ((int)(0.1*ckfreq) >= 1) ? (int)(0.1*ckfreq) : 1;
00102 int maxdelay = 5*ckfreq;
00103 mgr->SetNextSessionsCheck(lastcheck + ckfreq);
00104 TRACE(ALL, "next full sessions check in "<<ckfreq<<" secs");
00105 while(1) {
00106
00107
00108
00109 waitt = ckfreq - (time(0) - lastcheck);
00110 if (waitt > quickcheckfreq || waitt <= 0)
00111 waitt = quickcheckfreq;
00112 int pollRet = mgr->Pipe()->Poll(waitt);
00113
00114 if (pollRet > 0) {
00115
00116 XpdMsg msg;
00117 int rc = 0;
00118 if ((rc = mgr->Pipe()->Recv(msg)) != 0) {
00119 TRACE(XERR, "problems receiving message; errno: "<<-rc);
00120 continue;
00121 }
00122
00123 if (msg.Type() == XrdProofdProofServMgr::kSessionRemoval) {
00124
00125 XrdOucString fpid;
00126 if ((rc = msg.Get(fpid)) != 0) {
00127 TRACE(XERR, "kSessionRemoval: problems receiving process ID (buf: '"<<
00128 msg.Buf()<<"'); errno: "<<-rc);
00129 continue;
00130 }
00131 XrdSysMutexHelper mhp(mgr->Mutex());
00132
00133 mgr->DeleteFromSessions(fpid.c_str());
00134
00135 mgr->MvSession(fpid.c_str());
00136
00137 if (sched) {
00138 if (sched->Pipe()->Post(XrdProofSched::kReschedule, 0) != 0) {
00139 TRACE(XERR, "kSessionRemoval: problem posting the scheduler pipe");
00140 }
00141 }
00142
00143 TRACE(REQ, "kSessionRemoval: session: "<<fpid<<
00144 " has been removed from the active list");
00145 } else if (msg.Type() == XrdProofdProofServMgr::kClientDisconnect) {
00146
00147
00148
00149 int pid = 0;
00150 if ((rc = msg.Get(pid)) != 0) {
00151 TRACE(XERR, "kClientDisconnect: problems receiving process ID (buf: '"<<
00152 msg.Buf()<<"'); errno: "<<-rc);
00153 continue;
00154 }
00155 TRACE(REQ, "kClientDisconnect: a client just disconnected: "<<pid);
00156
00157 mgr->DisconnectFromProofServ(pid);
00158 TRACE(DBG, "quick check of active sessions");
00159
00160 mgr->CheckActiveSessions(0);
00161 } else if (msg.Type() == XrdProofdProofServMgr::kCleanSessions) {
00162
00163 XpdSrvMgrCreateCnt cnt(mgr, XrdProofdProofServMgr::kCleanSessionsCnt);
00164 XrdOucString usr;
00165 rc = msg.Get(usr);
00166 int svrtype;
00167 rc = (rc == 0) ? msg.Get(svrtype) : rc;
00168 if (rc != 0) {
00169 TRACE(XERR, "kCleanSessions: problems parsing message (buf: '"<<
00170 msg.Buf()<<"'); errno: "<<-rc);
00171 continue;
00172 }
00173
00174 TRACE(REQ, "kCleanSessions: request for user: '"<<usr<<"', server type: "<<svrtype);
00175
00176 mgr->CleanClientSessions(usr.c_str(), svrtype);
00177
00178 mgr->CleanupLostProofServ();
00179 } else if (msg.Type() == XrdProofdProofServMgr::kProcessReq) {
00180
00181 mgr->ProcessSem()->Post();
00182 } else if (msg.Type() == XrdProofdProofServMgr::kChgSessionSt) {
00183
00184 mgr->BroadcastClusterInfo();
00185 } else {
00186 TRACE(XERR, "unknown type: "<<msg.Type());
00187 continue;
00188 }
00189 } else {
00190
00191
00192 int now = time(0);
00193
00194
00195 int cnt = mgr->CheckCounter(XrdProofdProofServMgr::kProcessCnt);
00196 if (cnt > 0) {
00197 if ((now - lastrun) < maxdelay) {
00198
00199 lastcheck = now + 5 - ckfreq;
00200 mgr->SetNextSessionsCheck(now + 5);
00201
00202 TRACE(ALL, "postponing sessions check (will retry in 5 secs)");
00203 continue;
00204 } else {
00205
00206 TRACE(ALL, "Max time without checks reached ("<<maxdelay<<"): force a session check");
00207
00208 mgr->UpdateCounter(XrdProofdProofServMgr::kProcessCnt, -cnt);
00209 }
00210 }
00211
00212 bool full = (now > mgr->NextSessionsCheck() - deltat) ? 1 : 0;
00213 if (full) {
00214
00215 mgr->CheckActiveSessions();
00216 mgr->CheckTerminatedSessions();
00217 if (clnlostscale <= 0) {
00218 mgr->CleanupLostProofServ();
00219 clnlostscale = 10;
00220 } else {
00221 clnlostscale--;
00222 }
00223
00224 int cursess = mgr->CurrentSessions(1);
00225 TRACE(ALL, cursess << " sessions are currently active");
00226
00227 lastrun = now;
00228 lastcheck = now;
00229 mgr->SetNextSessionsCheck(lastcheck + mgr->CheckFrequency());
00230
00231 TRACE(ALL, "next sessions check in "<<mgr->CheckFrequency()<<" secs");
00232 } else {
00233 TRACE(HDBG, "nothing to do; "<<mgr->NextSessionsCheck()-now<<" secs to full check");
00234 }
00235 }
00236 }
00237
00238
00239 return (void *)0;
00240 }
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250 void *XrdProofdProofServRecover(void *p)
00251 {
00252
00253 XPDLOC(SMGR, "ProofServRecover")
00254
00255 XpdManagerCron_t *mc = (XpdManagerCron_t *)p;
00256 XrdProofdProofServMgr *mgr = mc->fSessionMgr;
00257 if (!(mgr)) {
00258 TRACE(XERR, "undefined session manager: cannot start");
00259 return (void *)0;
00260 }
00261
00262
00263 int rc = mgr->RecoverActiveSessions();
00264
00265
00266 if (rc > 0) {
00267 TRACE(ALL, "timeout recovering sessions: "<<rc<<" sessions not recovered");
00268 } else if (rc < 0) {
00269 TRACE(XERR, "some problem occured while recovering sessions");
00270 } else {
00271 TRACE(ALL, "recovering successfully terminated");
00272 }
00273
00274
00275 return (void *)0;
00276 }
00277
00278
00279 XrdProofdProofServMgr::XrdProofdProofServMgr(XrdProofdManager *mgr,
00280 XrdProtocol_Config *pi, XrdSysError *e)
00281 : XrdProofdConfig(pi->ConfigFN, e), fProcessSem(0)
00282 {
00283
00284 XPDLOC(SMGR, "XrdProofdProofServMgr")
00285
00286 fMgr = mgr;
00287 fLogger = pi->eDest->logger();
00288 fInternalWait = 10;
00289 fActiveSessions.clear();
00290 fShutdownOpt = 1;
00291 fShutdownDelay = 0;
00292 fReconnectTime = -1;
00293 fReconnectTimeOut = 300;
00294 fNextSessionsCheck = -1;
00295
00296 for (int i = 0; i < PSMMAXCNTS; i++) {
00297 fCounters[i] = 0;
00298 }
00299 fCurrentSessions = 0;
00300
00301
00302 fCheckFrequency = 30;
00303 fTerminationTimeOut = fCheckFrequency - 10;
00304 fVerifyTimeOut = 3 * fCheckFrequency;
00305 fRecoverTimeOut = 10;
00306 fCheckLost = 1;
00307 fParentExecs = "xproofd,xrootd";
00308
00309
00310 fRecoverClients = 0;
00311 fRecoverDeadline = -1;
00312
00313
00314 if (!fPipe.IsValid()) {
00315 TRACE(XERR, "unable to generate pipe for the session poller");
00316 return;
00317 }
00318
00319
00320 RegisterDirectives();
00321 }
00322
00323
00324 int XrdProofdProofServMgr::Config(bool rcf)
00325 {
00326
00327
00328 XPDLOC(SMGR, "ProofServMgr::Config")
00329
00330 XrdSysMutexHelper mhp(fEnvsMutex);
00331
00332 bool notify = (rcf) ? 0 : 1;
00333 if (rcf && ReadFile(0)) {
00334
00335 fProofServRCs.clear();
00336 fProofServEnvs.clear();
00337
00338 notify = 1;
00339 }
00340
00341
00342 if (XrdProofdConfig::Config(rcf) != 0) {
00343 TRACE(XERR, "problems parsing file ");
00344 return -1;
00345 }
00346
00347 XrdOucString msg;
00348 msg = (rcf) ? "re-configuring" : "configuring";
00349 if (notify) XPDPRT(msg);
00350
00351
00352 XPDFORM(msg, "setting internal timeout to %d secs", fInternalWait);
00353 if (notify) XPDPRT(msg);
00354
00355
00356 msg = "client sessions shutdown after disconnection";
00357 if (fShutdownOpt > 0) {
00358 XPDFORM(msg, "client sessions kept %sfor %d secs after disconnection",
00359 (fShutdownOpt == 1) ? "idle " : "", fShutdownDelay);
00360 }
00361 if (notify) XPDPRT(msg);
00362
00363 if (!rcf) {
00364
00365 fActiAdminPath = fMgr->AdminPath();
00366 fActiAdminPath += "/activesessions";
00367 fTermAdminPath = fMgr->AdminPath();
00368 fTermAdminPath += "/terminatedsessions";
00369
00370
00371 XrdProofUI ui;
00372 XrdProofdAux::GetUserInfo(fMgr->EffectiveUser(), ui);
00373 if (XrdProofdAux::AssertDir(fActiAdminPath.c_str(), ui, 1) != 0) {
00374 TRACE(XERR, "unable to assert the admin path: "<<fActiAdminPath);
00375 fActiAdminPath = "";
00376 return -1;
00377 }
00378 XPDPRT("active sessions admin path set to: "<<fActiAdminPath);
00379
00380 if (XrdProofdAux::AssertDir(fTermAdminPath.c_str(), ui, 1) != 0) {
00381 TRACE(XERR, "unable to assert the admin path "<<fTermAdminPath);
00382 fTermAdminPath = "";
00383 return -1;
00384 }
00385 XPDPRT("terminated sessions admin path set to "<<fTermAdminPath);
00386 }
00387
00388 if (notify) {
00389 XPDPRT("RC settings: "<< fProofServRCs.size());
00390 if (fProofServRCs.size() > 0) {
00391 std::list<XpdEnv>::iterator ircs = fProofServRCs.begin();
00392 for ( ; ircs != fProofServRCs.end(); ircs++) { (*ircs).Print("rc"); }
00393 }
00394 XPDPRT("ENV settings: "<< fProofServEnvs.size());
00395 if (fProofServEnvs.size() > 0) {
00396 std::list<XpdEnv>::iterator ienvs = fProofServEnvs.begin();
00397 for ( ; ienvs != fProofServEnvs.end(); ienvs++) { (*ienvs).Print("env"); }
00398 }
00399 }
00400
00401 if (!rcf) {
00402
00403 int nr = -1;
00404 if ((nr = PrepareSessionRecovering()) < 0) {
00405 TRACE(XERR, "problems trying to recover active sessions");
00406 } else if (nr > 0) {
00407 XPDFORM(msg, "%d active sessions have been recovered", nr);
00408 XPDPRT(msg);
00409 }
00410
00411
00412 pthread_t tid;
00413
00414 fManagerCron.fClientMgr = fMgr->ClientMgr();
00415 fManagerCron.fSessionMgr = this;
00416 if (XrdSysThread::Run(&tid, XrdProofdProofServCron,
00417 (void *)&fManagerCron, 0, "ProofServMgr cron thread") != 0) {
00418 TRACE(XERR, "could not start cron thread");
00419 return 0;
00420 }
00421 XPDPRT("cron thread started");
00422 }
00423
00424
00425 return 0;
00426 }
00427
00428
00429 int XrdProofdProofServMgr::AddSession(XrdProofdProtocol *p, XrdProofdProofServ *s)
00430 {
00431
00432 XPDLOC(SMGR, "ProofServMgr::AddSession")
00433
00434 TRACE(REQ, "adding new active session ...");
00435
00436
00437 if (!s || !p || !p->Client()) {
00438 TRACE(XERR,"invalid inputs: "<<p<<", "<<s<<", "<<p->Client());
00439 return -1;
00440 }
00441 XrdProofdClient *c = p->Client();
00442
00443
00444 XrdOucString path;
00445 XPDFORM(path, "%s/%s.%s.%d", fActiAdminPath.c_str(), c->User(), c->Group(), s->SrvPID());
00446
00447
00448 XrdProofSessionInfo info(c, s);
00449 int rc = info.SaveToFile(path.c_str());
00450
00451 return rc;
00452 }
00453
00454
00455 bool XrdProofdProofServMgr::IsSessionSocket(const char *fpid)
00456 {
00457
00458
00459 XPDLOC(SMGR, "ProofServMgr::IsSessionSocket")
00460
00461 TRACE(REQ, "checking "<<fpid<<" ...");
00462
00463
00464 if (!fpid || strlen(fpid) <= 0) {
00465 TRACE(XERR, "invalid input: "<<fpid);
00466 return 0;
00467 }
00468
00469
00470 XrdOucString spath(fpid);
00471 if (!spath.endswith(".sock")) return 0;
00472 if (!spath.beginswith(fActiAdminPath.c_str())) {
00473
00474 XPDFORM(spath, "%s/%s", fActiAdminPath.c_str(), fpid);
00475 }
00476 XrdOucString apath = spath;
00477 apath.replace(".sock", "");
00478
00479
00480 struct stat st;
00481 if (stat(apath.c_str(), &st) != 0 && (errno == ENOENT)) {
00482
00483 if (CheckCounter(kCreateCnt) <= 0) {
00484 unlink(spath.c_str());
00485 TRACE(REQ, "missing admin path: removing "<<spath<<" ...");
00486 }
00487 }
00488
00489
00490 return 1;
00491 }
00492
00493
00494 int XrdProofdProofServMgr::MvSession(const char *fpid)
00495 {
00496
00497 XPDLOC(SMGR, "ProofServMgr::MvSession")
00498
00499 TRACE(REQ, "moving "<<fpid<<" ...");
00500
00501
00502 if (!fpid || strlen(fpid) <= 0) {
00503 TRACE(XERR, "invalid input: "<<fpid);
00504 return -1;
00505 }
00506
00507
00508 XrdOucString opath(fpid), npath;
00509 if (!opath.beginswith(fActiAdminPath.c_str())) {
00510
00511 XPDFORM(opath, "%s/%s", fActiAdminPath.c_str(), fpid);
00512 opath.replace(".status", "");
00513 } else {
00514
00515 opath.replace(".status", "");
00516 }
00517
00518 npath = opath;
00519 npath.replace(fActiAdminPath.c_str(), fTermAdminPath.c_str());
00520
00521
00522 XrdOucString spath = opath;
00523 spath += ".sock";
00524 if (unlink(spath.c_str()) != 0 && errno != ENOENT)
00525 TRACE(XERR, "problems removing the UNIX socket path: "<<spath<<"; errno: "<<errno);
00526 spath.replace(".sock", ".status");
00527 if (unlink(spath.c_str()) != 0 && errno != ENOENT)
00528 TRACE(XERR, "problems removing the status file: "<<spath<<"; errno: "<<errno);
00529
00530
00531 errno = 0;
00532 int rc = 0;
00533 if ((rc = rename(opath.c_str(), npath.c_str())) == 0 || (errno == ENOENT)) {
00534 if (!rc)
00535
00536 TouchSession(fpid, npath.c_str());
00537 return 0;
00538 }
00539
00540 TRACE(XERR, "session pid file cannot be moved: "<<opath<<
00541 "; target file: "<<npath<<"; errno: "<<errno);
00542 return -1;
00543 }
00544
00545
00546 int XrdProofdProofServMgr::RmSession(const char *fpid)
00547 {
00548
00549 XPDLOC(SMGR, "ProofServMgr::RmSession")
00550
00551 TRACE(REQ, "removing "<<fpid<<" ...");
00552
00553
00554 if (!fpid || strlen(fpid) <= 0) {
00555 TRACE(XERR, "invalid input: "<<fpid);
00556 return -1;
00557 }
00558
00559
00560 XrdOucString path;
00561 XPDFORM(path, "%s/%s", fTermAdminPath.c_str(), fpid);
00562
00563
00564 if (unlink(path.c_str()) == 0)
00565 return 0;
00566
00567 TRACE(XERR, "session pid file cannot be unlinked: "<<
00568 path<<"; error: "<<errno);
00569 return -1;
00570 }
00571
00572
00573 int XrdProofdProofServMgr::TouchSession(const char *fpid, const char *fpath)
00574 {
00575
00576 XPDLOC(SMGR, "ProofServMgr::TouchSession")
00577
00578 TRACE(REQ, "touching "<<fpid<<", "<<fpath<<" ...");
00579
00580
00581 if (!fpid || strlen(fpid) <= 0) {
00582 TRACE(XERR, "invalid input: "<<fpid);
00583 return -1;
00584 }
00585
00586
00587 XrdOucString path(fpath);
00588 if (!fpath || strlen(fpath) == 0)
00589 XPDFORM(path, "%s/%s.status", fActiAdminPath.c_str(), fpid);
00590
00591
00592 if (utime(path.c_str(), 0) == 0)
00593 return 0;
00594
00595 TRACE(XERR, "time stamps for session pid file cannot be updated: "<<
00596 path<<"; error: "<<errno);
00597 return -1;
00598 }
00599
00600
00601 int XrdProofdProofServMgr::VerifySession(const char *fpid,
00602 int to, const char *fpath)
00603 {
00604
00605
00606
00607
00608 XPDLOC(SMGR, "ProofServMgr::VerifySession")
00609
00610
00611 if (!fpid || strlen(fpid) <= 0) {
00612 TRACE(XERR, "invalid input: "<<fpid);
00613 return -1;
00614 }
00615
00616
00617 XrdOucString path;
00618 if (fpath && strlen(fpath) > 0)
00619 XPDFORM(path, "%s/%s", fpath, fpid);
00620 else
00621 XPDFORM(path, "%s/%s", fActiAdminPath.c_str(), fpid);
00622
00623
00624 int deltat = -1;
00625 bool checkmore = 1;
00626 while (checkmore) {
00627
00628 struct stat st;
00629 if (stat(path.c_str(), &st)) {
00630 TRACE(XERR, "session status file cannot be stat'ed: "<<
00631 path<<"; error: "<<errno);
00632 return -1;
00633 }
00634
00635 int xto = (to > 0) ? to : fVerifyTimeOut;
00636 deltat = time(0) - st.st_mtime;
00637 if (deltat > xto) {
00638 if (path.endswith(".status")) {
00639
00640 path.erase(path.rfind(".status"));
00641 } else {
00642
00643 TRACE(DBG, "admin path for session "<<fpid<<" hase not been touched"
00644 " since at least "<< xto <<" secs");
00645 return 1;
00646 }
00647 } else {
00648
00649 checkmore = 0;
00650 }
00651 }
00652
00653
00654 TRACE(DBG, "admin path for session "<<fpid<<" was touched " <<
00655 deltat <<" secs ago");
00656 return 0;
00657 }
00658
00659
00660 int XrdProofdProofServMgr::DeleteFromSessions(const char *fpid)
00661 {
00662
00663
00664 XPDLOC(SMGR, "ProofServMgr::DeleteFromSessions")
00665
00666 TRACE(REQ, "session: "<<fpid);
00667
00668
00669 if (!fpid || strlen(fpid) <= 0) {
00670 TRACE(XERR, "invalid input: "<<fpid);
00671 return -1;
00672 }
00673
00674 XrdOucString key = fpid;
00675 key.replace(".status", "");
00676 key.erase(0, key.rfind('.') + 1);
00677 XrdProofdProofServ *xps = 0;
00678 { XrdSysMutexHelper mhp(fMutex); xps = fSessions.Find(key.c_str()); }
00679 if (xps) {
00680
00681 XrdOucString msg;
00682 XPDFORM(msg, "session: %s terminated by peer", fpid);
00683 TRACE(DBG, msg);
00684
00685 int tp = xps->Reset(msg.c_str(), kXPD_wrkmortem);
00686
00687 XrdSysMutexHelper mhp(fMutex);
00688 if (tp == 1) fCurrentSessions--;
00689
00690 fActiveSessions.remove(xps);
00691 }
00692 int rc = -1;
00693 { XrdSysMutexHelper mhp(fMutex); rc = fSessions.Del(key.c_str()); }
00694 return rc;
00695 }
00696
00697
00698 int XrdProofdProofServMgr::PrepareSessionRecovering()
00699 {
00700
00701
00702
00703 XPDLOC(SMGR, "ProofServMgr::PrepareSessionRecovering")
00704
00705
00706 DIR *dir = opendir(fActiAdminPath.c_str());
00707 if (!dir) {
00708 TRACE(XERR, "cannot open dir "<<fActiAdminPath<<" ; error: "<<errno);
00709 return -1;
00710 }
00711 TRACE(REQ, "preparing recovering of active sessions ...");
00712
00713
00714 fRecoverClients = new std::list<XpdClientSessions *>;
00715 struct dirent *ent = 0;
00716 while ((ent = (struct dirent *)readdir(dir))) {
00717 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
00718
00719 XrdOucString rest, a;
00720 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
00721 if (!XPD_LONGOK(pid) || pid <= 0) continue;
00722 if (a.length() > 0) continue;
00723 bool rmsession = 1;
00724
00725 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
00726 if (ResolveSession(ent->d_name) == 0) {
00727 TRACE(DBG, "found active session: "<<pid);
00728 rmsession = 0;
00729 }
00730 }
00731
00732 if (rmsession)
00733 MvSession(ent->d_name);
00734 }
00735
00736 closedir(dir);
00737
00738
00739 int nrc = 0;
00740 { XrdSysMutexHelper mhp(fRecoverMutex); nrc = fRecoverClients->size(); }
00741 if (nrc > 0) {
00742
00743 pthread_t tid;
00744
00745 fManagerCron.fClientMgr = fMgr->ClientMgr();
00746 fManagerCron.fSessionMgr = this;
00747 fManagerCron.fProofSched = fMgr->ProofSched();
00748 if (XrdSysThread::Run(&tid, XrdProofdProofServRecover, (void *)&fManagerCron,
00749 0, "ProofServMgr session recover thread") != 0) {
00750 TRACE(XERR, "could not start session recover thread");
00751 return 0;
00752 }
00753 XPDPRT("session recover thread started");
00754 } else {
00755
00756 if (fMgr->ClientMgr() && fMgr->ClientMgr()->GetNClients() <= 0)
00757 SetReconnectTime(0);
00758 }
00759
00760
00761 return 0;
00762 }
00763
00764
00765
00766 int XrdProofdProofServMgr::RecoverActiveSessions()
00767 {
00768
00769
00770
00771
00772
00773 XPDLOC(SMGR, "ProofServMgr::RecoverActiveSessions")
00774
00775 int rc = 0;
00776
00777 if (!fRecoverClients) {
00778
00779 TRACE(XERR, "recovering clients list undefined");
00780 return -1;
00781 }
00782
00783 int nrc = 0;
00784 { XrdSysMutexHelper mhp(fRecoverMutex); nrc = fRecoverClients->size(); }
00785 TRACE(REQ, "start recovering of "<<nrc<<" clients");
00786
00787
00788 { XrdSysMutexHelper mhp(fRecoverMutex);
00789 fRecoverDeadline = time(0) + fRecoverTimeOut * nrc; }
00790
00791
00792 int nr = 0;
00793 XpdClientSessions *cls = 0;
00794 bool go = true;
00795 while (go) {
00796
00797
00798 { XrdSysMutexHelper mhp(fRecoverMutex); cls = fRecoverClients->front(); }
00799 if (cls) {
00800 SetReconnectTime();
00801 nr += Recover(cls);
00802
00803
00804 { XrdSysMutexHelper mhp(cls->fMutex);
00805 if (cls->fProofServs.size() <= 0) {
00806 XrdSysMutexHelper mhpr(fRecoverMutex);
00807 fRecoverClients->remove(cls);
00808
00809 if ((nrc = fRecoverClients->size()) <= 0)
00810 break;
00811 }
00812 }
00813 }
00814 TRACE(REQ, nrc<<" clients still to recover");
00815
00816
00817 { XrdSysMutexHelper mhp(fRecoverMutex);
00818 go = (time(0) < fRecoverDeadline) ? true : false; }
00819 }
00820
00821 SetReconnectTime(0);
00822
00823
00824 rc = 0;
00825 { XrdSysMutexHelper mhp(fRecoverMutex);
00826 if (fRecoverClients->size() > 0) {
00827 std::list<XpdClientSessions* >::iterator ii = fRecoverClients->begin();
00828 for (; ii != fRecoverClients->end(); ii++) {
00829 rc += (*ii)->fProofServs.size();
00830 }
00831 }
00832 }
00833
00834
00835 { XrdSysMutexHelper mhp(fRecoverMutex);
00836 fRecoverClients->clear();
00837 delete fRecoverClients;
00838 fRecoverClients = 0;
00839 fRecoverDeadline = -1;
00840 }
00841
00842
00843 return rc;
00844 }
00845
00846
00847 bool XrdProofdProofServMgr::IsClientRecovering(const char *usr, const char *grp,
00848 int &deadline)
00849 {
00850
00851
00852
00853 XPDLOC(SMGR, "ProofServMgr::IsClientRecovering")
00854
00855 if (!usr || !grp) {
00856 TRACE(XERR, "invalid inputs: usr: "<<usr<<", grp:"<<grp<<" ...");
00857 return false;
00858 }
00859
00860 deadline = -1;
00861 int rc = false;
00862 { XrdSysMutexHelper mhp(fRecoverMutex);
00863 if (fRecoverClients && fRecoverClients->size() > 0) {
00864 std::list<XpdClientSessions *>::iterator ii = fRecoverClients->begin();
00865 for (; ii != fRecoverClients->end(); ii++) {
00866 if ((*ii)->fClient && (*ii)->fClient->Match(usr, grp)) {
00867 rc = true;
00868 deadline = fRecoverDeadline;
00869 break;
00870 }
00871 }
00872 }
00873 }
00874 TRACE(DBG, "checking usr: "<<usr<<", grp:"<<grp<<" ... recovering? "<<
00875 rc<<", until: "<<deadline);
00876
00877
00878 return rc;
00879 }
00880
00881
00882 int XrdProofdProofServMgr::CheckActiveSessions(bool verify)
00883 {
00884
00885
00886
00887
00888 XPDLOC(SMGR, "ProofServMgr::CheckActiveSessions")
00889
00890 TRACE(REQ, "checking active sessions ...");
00891
00892
00893 DIR *dir = opendir(fActiAdminPath.c_str());
00894 if (!dir) {
00895 TRACE(XERR, "cannot open dir "<<fActiAdminPath<<" ; error: "<<errno);
00896 return -1;
00897 }
00898
00899
00900 struct dirent *ent = 0;
00901 while ((ent = (struct dirent *)readdir(dir))) {
00902 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
00903
00904
00905 if (strstr(ent->d_name, ".sock") && IsSessionSocket(ent->d_name)) continue;
00906
00907 XrdOucString rest, key, after;
00908 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, after);
00909
00910 if (after != "status") continue;
00911
00912 if (!XPD_LONGOK(pid) || pid <= 0) continue;
00913 key += pid;
00914
00915 XrdProofdProofServ *xps = 0;
00916 { XrdSysMutexHelper mhp(fMutex);
00917 xps = fSessions.Find(key.c_str());
00918 }
00919
00920 bool sessionalive = (VerifySession(ent->d_name) == 0) ? 1 : 0;
00921 bool rmsession = 0;
00922 if (xps) {
00923 if (!xps->IsValid() || !sessionalive) rmsession = 1;
00924 } else {
00925
00926
00927 if (sessionalive) continue;
00928 rmsession = 1;
00929 }
00930
00931
00932 bool oldvers = (xps && xps->ROOT() && xps->ROOT()->SrvProtVers() >= 18) ? 0 : 1;
00933
00934
00935
00936 int nc = -1;
00937 if (!rmsession)
00938 rmsession = xps->CheckSession(oldvers, IsReconnecting(),
00939 fShutdownOpt, fShutdownDelay, fMgr->ChangeOwn(), nc);
00940
00941
00942
00943
00944
00945 if (!rmsession && verify && !oldvers) {
00946 if (xps->VerifyProofServ(0) != 0) {
00947
00948 rmsession = 1;
00949 }
00950 }
00951 TRACE(REQ, "session: "<<ent->d_name<<"; nc: "<<nc<<"; rm: "<<rmsession);
00952
00953 if (rmsession)
00954 MvSession(ent->d_name);
00955 }
00956
00957 closedir(dir);
00958
00959
00960 return 0;
00961 }
00962
00963
00964 int XrdProofdProofServMgr::CheckTerminatedSessions()
00965 {
00966
00967
00968
00969 XPDLOC(SMGR, "ProofServMgr::CheckTerminatedSessions")
00970
00971 TRACE(REQ, "checking terminated sessions ...");
00972
00973
00974 DIR *dir = opendir(fTermAdminPath.c_str());
00975 if (!dir) {
00976 TRACE(XERR, "cannot open dir "<<fTermAdminPath<<" ; error: "<<errno);
00977 return -1;
00978 }
00979
00980
00981 int now = -1;
00982 struct dirent *ent = 0;
00983 while ((ent = (struct dirent *)readdir(dir))) {
00984 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
00985
00986 XrdOucString rest, a;
00987 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
00988 if (!XPD_LONGOK(pid) || pid <= 0) continue;
00989
00990
00991 now = (now > 0) ? now : time(0);
00992
00993
00994 XrdOucString path;
00995 XPDFORM(path, "%s/%s", fTermAdminPath.c_str(), ent->d_name);
00996
00997
00998 struct stat st;
00999 int rcst = stat(path.c_str(), &st);
01000 TRACE(DBG, pid<<": rcst: "<<rcst<<", now - mtime: "<<now - st.st_mtime<<" secs")
01001 if ((now - st.st_mtime) > fTerminationTimeOut || rcst != 0) {
01002
01003 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
01004
01005 XrdProofSessionInfo info(path.c_str());
01006 XrdProofUI ui;
01007 XrdProofdAux::GetUserInfo(info.fUser.c_str(), ui);
01008 XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn());
01009 } else {
01010
01011 RmSession(ent->d_name);
01012 }
01013 }
01014 }
01015
01016 closedir(dir);
01017
01018
01019 return 0;
01020 }
01021
01022
01023 int XrdProofdProofServMgr::CleanClientSessions(const char *usr, int srvtype)
01024 {
01025
01026
01027 XPDLOC(SMGR, "ProofServMgr::CleanClientSessions")
01028
01029 TRACE(REQ, "cleaning "<<usr<<" ...");
01030
01031
01032 bool all = (!usr || strlen(usr) <= 0 || !strcmp(usr, "all")) ? 1 : 0;
01033
01034
01035 XrdProofUI ui;
01036 if (!all)
01037 XrdProofdAux::GetUserInfo(usr, ui);
01038 XrdOucString path, rest, key, a;
01039
01040
01041 XrdSysRecMutex *mtx = 0;
01042 if (all) {
01043
01044 mtx = &fMutex;
01045 } else {
01046
01047 XrdProofdClient *c = fMgr->ClientMgr()->GetClient(usr);
01048 if (c) mtx = c->Mutex();
01049 }
01050
01051 std::list<int> tobedel;
01052 { XrdSysMutexHelper mtxh(mtx);
01053
01054
01055 DIR *dir = opendir(fTermAdminPath.c_str());
01056 if (!dir) {
01057 TRACE(XERR, "cannot open dir "<<fTermAdminPath<<" ; error: "<<errno);
01058 } else {
01059
01060 struct dirent *ent = 0;
01061 while ((ent = (struct dirent *)readdir(dir))) {
01062
01063 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
01064
01065 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
01066 if (!XPD_LONGOK(pid) || pid <= 0) continue;
01067
01068 XPDFORM(path, "%s/%s", fTermAdminPath.c_str(), ent->d_name);
01069 XrdProofSessionInfo info(path.c_str());
01070
01071 if (!all && info.fUser != usr) continue;
01072
01073 if (srvtype != kXPD_AnyServer && info.fSrvType != srvtype) continue;
01074
01075 if (all)
01076 XrdProofdAux::GetUserInfo(info.fUser.c_str(), ui);
01077
01078 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
01079
01080 XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn());
01081 } else {
01082
01083 RmSession(ent->d_name);
01084 }
01085 }
01086
01087 closedir(dir);
01088 }
01089
01090
01091 dir = opendir(fActiAdminPath.c_str());
01092 if (!dir) {
01093 TRACE(XERR, "cannot open dir "<<fActiAdminPath<<" ; error: "<<errno);
01094 return -1;
01095 }
01096
01097
01098 struct dirent *ent = 0;
01099 while ((ent = (struct dirent *)readdir(dir))) {
01100
01101 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
01102
01103 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
01104 if (a == "status") continue;
01105 if (!XPD_LONGOK(pid) || pid <= 0) continue;
01106
01107 XPDFORM(path, "%s/%s", fActiAdminPath.c_str(), ent->d_name);
01108 XrdProofSessionInfo info(path.c_str());
01109 if (!all && info.fUser != usr) continue;
01110
01111 if (srvtype != kXPD_AnyServer && info.fSrvType != srvtype) continue;
01112
01113 if (all)
01114 XrdProofdAux::GetUserInfo(info.fUser.c_str(), ui);
01115
01116 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
01117
01118 tobedel.push_back(pid);
01119
01120 XrdProofdAux::KillProcess(pid, 0, ui, fMgr->ChangeOwn());
01121 }
01122
01123 MvSession(ent->d_name);
01124 }
01125
01126 closedir(dir);
01127 }
01128
01129
01130 std::list<int>::iterator ii = tobedel.begin();
01131 while (ii != tobedel.end()) {
01132 XPDFORM(key, "%d", *ii);
01133 XrdSysMutexHelper mhp(fMutex);
01134 fSessions.Del(key.c_str());
01135 ii++;
01136 }
01137
01138
01139 return 0;
01140 }
01141
01142
01143 void XrdProofdProofServMgr::RegisterDirectives()
01144 {
01145
01146
01147
01148 Register("proofservmgr", new XrdProofdDirective("proofservmgr", this, &DoDirectiveClass));
01149 Register("putenv", new XrdProofdDirective("putenv", this, &DoDirectiveClass));
01150 Register("putrc", new XrdProofdDirective("putrc", this, &DoDirectiveClass));
01151 Register("shutdown", new XrdProofdDirective("shutdown", this, &DoDirectiveClass));
01152
01153 Register("intwait",
01154 new XrdProofdDirective("intwait", (void *)&fInternalWait, &DoDirectiveInt));
01155 Register("reconnto",
01156 new XrdProofdDirective("reconnto", (void *)&fReconnectTimeOut, &DoDirectiveInt));
01157
01158 Register("proofplugin",
01159 new XrdProofdDirective("proofplugin", (void *)&fProofPlugin, &DoDirectiveString));
01160 Register("proofservparents",
01161 new XrdProofdDirective("proofservparents", (void *)&fParentExecs, &DoDirectiveString));
01162 }
01163
01164
01165 int XrdProofdProofServMgr::DoDirective(XrdProofdDirective *d,
01166 char *val, XrdOucStream *cfg, bool rcf)
01167 {
01168
01169 XPDLOC(SMGR, "ProofServMgr::DoDirective")
01170
01171 if (!d)
01172
01173 return -1;
01174
01175 if (d->fName == "proofservmgr") {
01176 return DoDirectiveProofServMgr(val, cfg, rcf);
01177 } else if (d->fName == "putenv") {
01178 return DoDirectivePutEnv(val, cfg, rcf);
01179 } else if (d->fName == "putrc") {
01180 return DoDirectivePutRc(val, cfg, rcf);
01181 } else if (d->fName == "shutdown") {
01182 return DoDirectiveShutdown(val, cfg, rcf);
01183 }
01184 TRACE(XERR,"unknown directive: "<<d->fName);
01185 return -1;
01186 }
01187
01188
01189 int XrdProofdProofServMgr::DoDirectiveProofServMgr(char *val, XrdOucStream *cfg, bool rcf)
01190 {
01191
01192
01193 XPDLOC(SMGR, "ProofServMgr::DoDirectiveProofServMgr")
01194
01195 if (!val || !cfg)
01196
01197 return -1;
01198
01199 if (rcf)
01200
01201 return 0;
01202
01203 int checkfq = -1;
01204 int termto = -1;
01205 int verifyto = -1;
01206 int recoverto = -1;
01207 int checklost = 0;
01208
01209 while (val) {
01210 XrdOucString tok(val);
01211 if (tok.beginswith("checkfq:")) {
01212 tok.replace("checkfq:", "");
01213 checkfq = strtol(tok.c_str(), 0, 10);
01214 } else if (tok.beginswith("termto:")) {
01215 tok.replace("termto:", "");
01216 termto = strtol(tok.c_str(), 0, 10);
01217 } else if (tok.beginswith("verifyto:")) {
01218 tok.replace("verifyto:", "");
01219 verifyto = strtol(tok.c_str(), 0, 10);
01220 } else if (tok.beginswith("recoverto:")) {
01221 tok.replace("recoverto:", "");
01222 recoverto = strtol(tok.c_str(), 0, 10);
01223 } else if (tok.beginswith("checklost:")) {
01224 tok.replace("checklost:", "");
01225 checklost = strtol(tok.c_str(), 0, 10);
01226 }
01227
01228 val = cfg->GetWord();
01229 }
01230
01231
01232 if (fMgr->Host() && cfg)
01233 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
01234 return 0;
01235
01236
01237 fCheckFrequency = (XPD_LONGOK(checkfq) && checkfq > 0) ? checkfq : fCheckFrequency;
01238 fTerminationTimeOut = (XPD_LONGOK(termto) && termto > 0) ? termto : fTerminationTimeOut;
01239 fVerifyTimeOut = (XPD_LONGOK(verifyto) && (verifyto > fCheckFrequency + 1))
01240 ? verifyto : fVerifyTimeOut;
01241 fRecoverTimeOut = (XPD_LONGOK(recoverto) && recoverto > 0) ? recoverto : fRecoverTimeOut;
01242 if (XPD_LONGOK(checklost)) fCheckLost = (checklost != 0) ? 1 : 0;
01243
01244 XrdOucString msg;
01245 XPDFORM(msg, "checkfq: %d s, termto: %d s, verifyto: %d s, recoverto: %d s, checklost: %d",
01246 fCheckFrequency, fTerminationTimeOut, fVerifyTimeOut, fRecoverTimeOut, fCheckLost);
01247 TRACE(ALL, msg);
01248
01249 return 0;
01250 }
01251
01252
01253 int XrdProofdProofServMgr::DoDirectivePutEnv(char *val, XrdOucStream *cfg, bool)
01254 {
01255
01256
01257 if (!val)
01258
01259 return -1;
01260
01261
01262 XrdOucString users, groups, rcval, rcnam;
01263 int smi = -1, smx = -1, vmi = -1, vmx = -1;
01264 bool hex = 0;
01265 ExtractEnv(val, cfg, users, groups, rcval, rcnam, smi, smx, vmi, vmx, hex);
01266
01267
01268 int iequ = rcnam.find('=');
01269 if (iequ == STR_NPOS) return -1;
01270 rcnam.erase(iequ);
01271
01272
01273 FillEnvList(&fProofServEnvs, rcnam.c_str(), rcval.c_str(),
01274 users.c_str(), groups.c_str(), smi, smx, vmi, vmx, hex);
01275
01276 return 0;
01277 }
01278
01279
01280 int XrdProofdProofServMgr::DoDirectivePutRc(char *val, XrdOucStream *cfg, bool)
01281 {
01282
01283
01284
01285
01286
01287
01288 if (!val || !cfg)
01289
01290 return -1;
01291
01292
01293 XrdOucString users, groups, rcval, rcnam;
01294 int smi = -1, smx = -1, vmi = -1, vmx = -1;
01295 bool hex = 0;
01296 ExtractEnv(val, cfg, users, groups, rcval, rcnam, smi, smx, vmi, vmx, hex);
01297
01298
01299 FillEnvList(&fProofServRCs, rcnam.c_str(), rcval.c_str(),
01300 users.c_str(), groups.c_str(), smi, smx, vmi, vmx, hex);
01301
01302 return 0;
01303 }
01304
01305
01306 void XrdProofdProofServMgr::ExtractEnv(char *val, XrdOucStream *cfg,
01307 XrdOucString &users, XrdOucString &groups,
01308 XrdOucString &rcval, XrdOucString &rcnam,
01309 int &smi, int &smx, int &vmi, int &vmx, bool &hex)
01310 {
01311
01312
01313 XrdOucString ssvn, sver;
01314 int idash = -1;
01315 while (val && val[0]) {
01316 if (!strncmp(val, "u:", 2)) {
01317 users = val;
01318 users.erase(0,2);
01319 } else if (!strncmp(val, "g:", 2)) {
01320 groups = val;
01321 groups.erase(0,2);
01322 } else if (!strncmp(val, "s:", 2)) {
01323 ssvn = val;
01324 ssvn.erase(0,2);
01325 idash = ssvn.find('-');
01326 if (idash != STR_NPOS) {
01327 if (ssvn.isdigit(0, idash-1)) smi = ssvn.atoi(0, idash-1);
01328 if (ssvn.isdigit(idash+1)) smx = ssvn.atoi(idash+1);
01329 } else {
01330 if (ssvn.isdigit()) smi = ssvn.atoi();
01331 }
01332 } else if (!strncmp(val, "v:", 2)) {
01333 sver = val;
01334 sver.erase(0,2);
01335 hex = 0;
01336 if (sver.beginswith('x')) {
01337 hex = 1;
01338 sver.erase(0,1);
01339 }
01340 idash = sver.find('-');
01341 if (idash != STR_NPOS) {
01342 if (sver.isdigit(0, idash-1)) vmi = sver.atoi(0, idash-1);
01343 if (sver.isdigit(idash+1)) vmx = sver.atoi(idash+1);
01344 } else {
01345 if (sver.isdigit()) vmi = sver.atoi();
01346 }
01347 } else {
01348 if (rcval.length() > 0) {
01349 rcval += ' ';
01350 } else {
01351 rcnam = val;
01352 }
01353 rcval += val;
01354 }
01355 val = cfg->GetWord();
01356 }
01357
01358 return;
01359 }
01360
01361
01362 void XrdProofdProofServMgr::FillEnvList(std::list<XpdEnv> *el, const char *nam, const char *val,
01363 const char *usrs, const char *grps,
01364 int smi, int smx, int vmi, int vmx, bool hex)
01365 {
01366
01367 XPDLOC(SMGR, "ProofServMgr::FillEnvList")
01368
01369 if (!el) {
01370 TRACE(ALL, "env list undefined!");
01371 return;
01372 }
01373
01374 XrdOucString users(usrs), groups(grps);
01375
01376 if (vmi > 0) vmi = XpdEnv::ToVersCode(vmi, hex);
01377 if (vmx > 0) vmx = XpdEnv::ToVersCode(vmx, hex);
01378
01379 XpdEnv xpe(nam, val, users.c_str(), groups.c_str(), smi, smx, vmi, vmx);
01380 if (users.length() > 0) {
01381 XrdOucString usr;
01382 int from = 0;
01383 while ((from = users.tokenize(usr, from, ',')) != -1) {
01384 if (usr.length() > 0) {
01385 if (groups.length() > 0) {
01386 XrdOucString grp;
01387 int fromg = 0;
01388 while ((fromg = groups.tokenize(grp, from, ',')) != -1) {
01389 if (grp.length() > 0) {
01390 xpe.Reset(nam, val, usr.c_str(), grp.c_str(), smi, smx, vmi, vmx);
01391 el->push_back(xpe);
01392 }
01393 }
01394 } else {
01395 xpe.Reset(nam, val, usr.c_str(), 0, smi, smx, vmi, vmx);
01396 el->push_back(xpe);
01397 }
01398 }
01399 }
01400 } else {
01401 if (groups.length() > 0) {
01402 XrdOucString grp;
01403 int fromg = 0;
01404 while ((fromg = groups.tokenize(grp, fromg, ',')) != -1) {
01405 if (grp.length() > 0) {
01406 xpe.Reset(nam, val, 0, grp.c_str(), smi, smx, vmi, vmx);
01407 el->push_back(xpe);
01408 }
01409 }
01410 } else {
01411 el->push_back(xpe);
01412 }
01413 }
01414
01415 return;
01416 }
01417
01418
01419 int XrdProofdProofServMgr::DoDirectiveShutdown(char *val, XrdOucStream *cfg, bool)
01420 {
01421
01422
01423 if (!val || !cfg)
01424
01425 return -1;
01426
01427 int opt = -1;
01428 int delay = -1;
01429
01430
01431 int dp = strtol(val,0,10);
01432 if (dp >= 0 && dp <= 2)
01433 opt = dp;
01434
01435 if ((val = cfg->GetWord())) {
01436 int l = strlen(val);
01437 int f = 1;
01438 XrdOucString tval = val;
01439
01440 if (val[l-1] == 's') {
01441 val[l-1] = 0;
01442 } else if (val[l-1] == 'm') {
01443 f = 60;
01444 val[l-1] = 0;
01445 } else if (val[l-1] == 'h') {
01446 f = 3600;
01447 val[l-1] = 0;
01448 } else if (val[l-1] < 48 || val[l-1] > 57) {
01449 f = -1;
01450 }
01451 if (f > 0) {
01452 int de = strtol(val,0,10);
01453 if (de > 0)
01454 delay = de * f;
01455 }
01456 }
01457
01458
01459 if (fMgr->Host() && cfg)
01460 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
01461 return 0;
01462
01463
01464 fShutdownOpt = (opt > -1) ? opt : fShutdownOpt;
01465 fShutdownDelay = (delay > -1) ? delay : fShutdownDelay;
01466
01467 return 0;
01468 }
01469
01470
01471 int XrdProofdProofServMgr::Process(XrdProofdProtocol *p)
01472 {
01473
01474 XPDLOC(SMGR, "ProofServMgr::Process")
01475
01476 int rc = 1;
01477 XPD_SETRESP(p, "Process");
01478
01479 TRACEP(p, REQ, "enter: req id: " << p->Request()->header.requestid << " (" <<
01480 XrdProofdAux::ProofRequestTypes(p->Request()->header.requestid) << ")");
01481
01482 XrdSysMutexHelper mtxh(p->Client()->Mutex());
01483
01484
01485 XrdOucString emsg("Invalid request code: ");
01486
01487 int twait = 20;
01488
01489 if (Pipe()->Post(XrdProofdProofServMgr::kProcessReq, 0) != 0) {
01490 response->Send(kXR_ServerError,
01491 "ProofServMgr::Process: error posting internal pipe for authorization to proceed");
01492 return 0;
01493 }
01494 if (fProcessSem.Wait(twait) != 0) {
01495 response->Send(kXR_ServerError,
01496 "ProofServMgr::Process: timed-out waiting for authorization to proceed - retry later");
01497 return 0;
01498 }
01499
01500
01501 XpdSrvMgrCreateCnt cnt(this, kProcessCnt);
01502
01503 switch(p->Request()->header.requestid) {
01504 case kXP_create:
01505 return Create(p);
01506 case kXP_destroy:
01507 return Destroy(p);
01508 case kXP_attach:
01509 return Attach(p);
01510 case kXP_detach:
01511 return Detach(p);
01512 default:
01513 emsg += p->Request()->header.requestid;
01514 break;
01515 }
01516
01517
01518 response->Send(kXR_InvalidRequest, emsg.c_str());
01519 return 0;
01520 }
01521
01522
01523 int XrdProofdProofServMgr::Attach(XrdProofdProtocol *p)
01524 {
01525
01526 XPDLOC(SMGR, "ProofServMgr::Attach")
01527
01528 int psid = -1, rc = 0;
01529 XPD_SETRESP(p, "Attach");
01530
01531
01532 psid = ntohl(p->Request()->proof.sid);
01533 TRACEP(p, REQ, "psid: "<<psid<<", CID = "<<p->CID());
01534
01535
01536 XrdProofdClient *c = p->Client();
01537 if (!c) {
01538 TRACEP(p, XERR, "client instance undefined");
01539 response->Send(kXR_ServerError,"client instance undefined");
01540 return 0;
01541 }
01542
01543
01544
01545 XrdProofdProofServ *xps = 0;
01546 int now = time(0);
01547 int deadline = -1, defdeadline = now + fRecoverTimeOut;
01548 while ((deadline < 0) || (now < deadline)) {
01549 if (!(xps = c->GetServer(psid)) || !xps->IsValid()) {
01550
01551 if (!IsClientRecovering(c->User(), c->Group(), deadline)) {
01552
01553 TRACEP(p, XERR, "session ID not found: "<<psid);
01554 response->Send(kXR_InvalidRequest,"session ID not found");
01555 return 0;
01556 } else {
01557
01558 deadline = (deadline > 0) ? deadline : defdeadline;
01559
01560 sleep(1);
01561 now++;
01562 }
01563 } else {
01564
01565 break;
01566 }
01567 }
01568
01569 if (!xps || !xps->IsValid()) {
01570 TRACEP(p, XERR, "session ID not found: "<<psid);
01571 response->Send(kXR_InvalidRequest,"session ID not found");
01572 return 0;
01573 }
01574 TRACEP(p, DBG, "xps: "<<xps<<", status: "<< xps->Status());
01575
01576
01577 unsigned short sid;
01578 memcpy((void *)&sid, (const void *)&(p->Request()->header.streamid[0]), 2);
01579
01580
01581
01582 XrdClientID *csid = xps->GetClientID(p->CID());
01583 csid->SetP(p);
01584 csid->SetSid(sid);
01585
01586
01587 if (!(xps->Parent()))
01588 xps->SetParent(csid);
01589
01590
01591 int protvers = (xps && xps->ROOT()) ? xps->ROOT()->SrvProtVers() : -1;
01592 if (p->ConnType() == kXPD_ClientMaster) {
01593
01594 XrdOucString dpu = fMgr->PoolURL();
01595 if (!dpu.endswith('/'))
01596 dpu += '/';
01597 dpu += fMgr->NameSpace();
01598 response->SendI(psid, protvers, (kXR_int16)XPROOFD_VERSBIN,
01599 (void *) dpu.c_str(), dpu.length());
01600 } else
01601 response->SendI(psid, protvers, (kXR_int16)XPROOFD_VERSBIN);
01602
01603
01604 if (xps->Status() == kXPD_running && xps->StartMsg()) {
01605 TRACEP(p, XERR, "sending start process message ("<<xps->StartMsg()->fSize<<" bytes)");
01606 response->Send(kXR_attn, kXPD_msg,
01607 xps->StartMsg()->fBuff, xps->StartMsg()->fSize);
01608 }
01609
01610
01611 return 0;
01612 }
01613
01614
01615 int XrdProofdProofServMgr::Create(XrdProofdProtocol *p)
01616 {
01617
01618 XPDLOC(SMGR, "ProofServMgr::Create")
01619
01620 int psid = -1, rc = 0;
01621 XPD_SETRESP(p, "Create");
01622
01623 TRACEP(p, DBG, "enter");
01624 XrdOucString msg;
01625
01626 XpdSrvMgrCreateGuard mcGuard;
01627
01628
01629 int mxsess = fMgr->ProofSched() ? fMgr->ProofSched()->MaxSessions() : -1;
01630 if (p->ConnType() == kXPD_ClientMaster && mxsess > 0) {
01631 XrdSysMutexHelper mhp(fMutex);
01632 int cursess = CurrentSessions();
01633 TRACEP(p,ALL," cursess: "<<cursess);
01634 if (mxsess <= cursess) {
01635 XPDFORM(msg, " ++++ Max number of sessions reached (%d) - please retry later ++++ \n", cursess);
01636 response->Send(kXR_attn, kXPD_srvmsg, (char *) msg.c_str(), msg.length());
01637 response->Send(kXP_TooManySess, "cannot start a new session");
01638 return 0;
01639 }
01640
01641 mcGuard.Set(&fCurrentSessions);
01642 }
01643
01644
01645 XpdSrvMgrCreateCnt cnt(this, kCreateCnt);
01646 if (TRACING(DBG)) {
01647 int nc = CheckCounter(kCreateCnt);
01648 TRACEP(p, DBG, nc << " threads are creating a new session");
01649 }
01650
01651
01652 XrdProofdProofServ *xps = p->Client()->GetFreeServObj();
01653 xps->SetClient(p->Client()->User());
01654 xps->SetSrvType(p->ConnType());
01655 psid = xps->ID();
01656
01657
01658 unsigned short sid;
01659 memcpy((void *)&sid, (const void *)&(p->Request()->header.streamid[0]), 2);
01660
01661
01662 XrdClientID *csid = xps->GetClientID(p->CID());
01663 csid->SetSid(sid);
01664 csid->SetP(p);
01665
01666 xps->SetParent(csid);
01667
01668
01669 int loglevel = ntohl(p->Request()->proof.int1);
01670
01671
01672 char *buf = p->Argp()->buff;
01673 int len = p->Request()->proof.dlen;
01674
01675
01676 XrdOucString tag(buf,len);
01677
01678 TRACEP(p, DBG, "received buf: "<<tag);
01679
01680 tag.erase(tag.find('|'));
01681 xps->SetTag(tag.c_str());
01682 TRACEP(p, DBG, "tag: "<<tag);
01683
01684
01685 XrdOucString ord = "0";
01686 if ((p->ConnType() == kXPD_MasterWorker) || (p->ConnType() == kXPD_MasterMaster)) {
01687 ord.assign(buf,0,len-1);
01688 int iord = ord.find("|ord:");
01689 if (iord != STR_NPOS) {
01690 ord.erase(0,iord+5);
01691 ord.erase(ord.find("|"));
01692 } else
01693 ord = "0";
01694 }
01695 xps->SetOrdinal(ord.c_str());
01696
01697
01698 XrdOucString cffile;
01699 cffile.assign(buf,0,len-1);
01700 int icf = cffile.find("|cf:");
01701 if (icf != STR_NPOS) {
01702 cffile.erase(0,icf+4);
01703 cffile.erase(cffile.find("|"));
01704 } else
01705 cffile = "";
01706
01707
01708 XrdOucString uenvs;
01709 uenvs.assign(buf,0,len-1);
01710 int ienv = uenvs.find("|envs:");
01711 if (ienv != STR_NPOS) {
01712 uenvs.erase(0,ienv+6);
01713 uenvs.erase(uenvs.find("|"));
01714 xps->SetUserEnvs(uenvs.c_str());
01715 } else
01716 uenvs = "";
01717
01718
01719 int intwait = fInternalWait;
01720 if (uenvs.length() > 0) {
01721 TRACEP(p, DBG, "user envs: "<<uenvs);
01722 int iiw = STR_NPOS;
01723 if ((iiw = uenvs.find("PROOF_INTWAIT=")) != STR_NPOS) {
01724 XrdOucString s(uenvs, iiw + strlen("PROOF_INTWAIT="));
01725 s.erase(s.find(','));
01726 if (s.isdigit()) {
01727 intwait = s.atoi();
01728 TRACEP(p, ALL, "startup internal wait set by user to "<<intwait);
01729 }
01730 }
01731 }
01732
01733
01734 xps->SetROOT(p->Client()->ROOT());
01735 XPDFORM(msg, "using ROOT version: %s", xps->ROOT()->Export());
01736 TRACEP(p, REQ, msg);
01737 if (p->ConnType() == kXPD_ClientMaster) {
01738
01739 if (p->Client()->ROOT() != fMgr->ROOTMgr()->DefaultVersion()) {
01740 XPDFORM(msg, "++++ Using NON-default ROOT version: %s ++++\n", xps->ROOT()->Export());
01741 response->Send(kXR_attn, kXPD_srvmsg, (char *) msg.c_str(), msg.length());
01742 }
01743 }
01744
01745
01746 TRACEP(p, DBG, "{ord,cfg,psid,cid,log}: {"<<ord<<","<<cffile<<","<<psid
01747 <<","<<p->CID()<<","<<loglevel<<"}");
01748
01749
01750
01751
01752 if (fForkSem.Wait(10) != 0) {
01753 xps->Reset();
01754
01755 response->Send(kXP_ServerError, "timed-out acquiring fork semaphore");
01756 return 0;
01757 }
01758
01759
01760 XrdProofdPipe fpc, fcp;
01761 if (!(fpc.IsValid()) || !(fcp.IsValid())) {
01762 xps->Reset();
01763
01764 response->Send(kXP_ServerError,
01765 "unable to create pipes for communication during setup");
01766 return 0;
01767 }
01768
01769
01770 int pid = -1;
01771 TRACEP(p, FORK,"Forking external proofsrv");
01772 if (!(pid = fMgr->Sched()->Fork("proofsrv"))) {
01773
01774
01775 ProofServEnv_t in = {xps, loglevel, cffile.c_str(), "", "", "", "", ""};
01776 GetTagDirs(p, xps, in.fSessionTag, in.fTopSessionTag, in.fSessionDir, in.fWrkDir);
01777 XPDFORM(in.fLogFile, "%s.log", in.fWrkDir.c_str());
01778 TRACE(FORK, "log file: "<<in.fLogFile);
01779
01780 XpdMsg xmsg;
01781 XrdOucString path, sockpath, emsg;
01782
01783
01784 if (fpc.Poll() < 0) {
01785 TRACE(XERR, "error while polling to receive the admin path from parent - EXIT" );
01786 exit(1);
01787 }
01788 if (fpc.Recv(xmsg) != 0) {
01789 TRACE(XERR, "error reading message while waiting for the admin path from parent - EXIT" );
01790 exit(1);
01791 }
01792 if (xmsg.Type() < 0) {
01793 TRACE(XERR, "the parent failed to setup the admin path - EXIT" );
01794 exit(1);
01795 }
01796
01797 path = xmsg.Buf();
01798 xps->SetAdminPath(path.c_str(), 0);
01799 TRACE(FORK, "child: admin path: "<<path);
01800
01801 xmsg.Reset();
01802
01803 if (fpc.Poll() < 0) {
01804 TRACE(XERR, "error while polling to receive the sock path from parent - EXIT" );
01805 exit(1);
01806 }
01807 if (fpc.Recv(xmsg) != 0) {
01808 TRACE(XERR, "error reading message while waiting for the sock path from parent - EXIT" );
01809 exit(1);
01810 }
01811 if (xmsg.Type() < 0) {
01812 TRACE(XERR, "the parent failed to setup the sock path - EXIT" );
01813 exit(1);
01814 }
01815
01816 sockpath = xmsg.Buf();
01817 xps->SetUNIXSockPath(sockpath.c_str());
01818 TRACE(FORK, "child: UNIX sock path: "<<sockpath);
01819
01820
01821 if (fLogger) fLogger->Bind(in.fLogFile.c_str());
01822
01823
01824 if (chown(in.fLogFile.c_str(), p->Client()->UI().fUid, p->Client()->UI().fGid) != 0)
01825 TRACE(XERR, "chown on '"<<in.fLogFile.c_str()<<"'; errno: "<<errno);
01826
01827 XrdOucString pmsg = "child process ";
01828 pmsg += (int) getpid();
01829 TRACE(FORK, pmsg);
01830
01831
01832 bool asserdatadir = 1;
01833 int srvtype = xps->SrvType();
01834 TRACE(ALL,"srvtype = "<< srvtype);
01835 if (xps->SrvType() != kXPD_Worker && !strchr(fMgr->DataDirOpts(), 'M')) {
01836 asserdatadir = 0;
01837 } else if (xps->SrvType() == kXPD_Worker && !strchr(fMgr->DataDirOpts(), 'W')) {
01838 asserdatadir = 0;
01839 }
01840 const char *pord = asserdatadir ? ord.c_str() : 0;
01841 const char *ptag = asserdatadir ? in.fSessionTag.c_str() : 0;
01842 if (SetUserOwnerships(p, pord, ptag) != 0) {
01843 emsg = "SetUserOwnerships did not return OK - EXIT";
01844 TRACE(XERR, emsg);
01845 if (fcp.Post(0, emsg.c_str()) != 0)
01846 TRACE(XERR, "cannot write to internal pipe; errno: "<<errno);
01847 exit(1);
01848 }
01849
01850
01851 if (SetUserEnvironment(p) != 0) {
01852 emsg = "SetUserEnvironment did not return OK - EXIT";
01853 TRACE(XERR, emsg);
01854 if (fcp.Post(0, emsg.c_str()) != 0)
01855 TRACE(XERR, "cannot write to internal pipe; errno: "<<errno);
01856 exit(1);
01857 }
01858
01859 char *argvv[6] = {0};
01860
01861 char *sxpd = 0;
01862 if (fMgr && fMgr->AdminPath()) {
01863
01864 sxpd = new char[strlen(fMgr->AdminPath()) + strlen("xpdpath:") + 1];
01865 sprintf(sxpd, "xpdpath:%s", fMgr->AdminPath());
01866 } else {
01867
01868 sxpd = new char[10];
01869 sprintf(sxpd, "%d", getppid());
01870 }
01871
01872
01873 char slog[10] = {0};
01874 sprintf(slog, "%d", loglevel);
01875
01876
01877 argvv[0] = (char *) xps->ROOT()->PrgmSrv();
01878 argvv[1] = (char *)((p->ConnType() == kXPD_MasterWorker) ? "proofslave"
01879 : "proofserv");
01880 argvv[2] = (char *)"xpd";
01881 argvv[3] = (char *)sxpd;
01882 argvv[4] = (char *)slog;
01883 argvv[5] = 0;
01884
01885
01886 if (SetProofServEnv(p, (void *)&in) != 0) {
01887 emsg = "SetProofServEnv did not return OK - EXIT";
01888 TRACE(XERR, emsg);
01889 if (fcp.Post(0, emsg.c_str()) != 0)
01890 TRACE(XERR, "cannot write to internal pipe; errno: "<<errno);
01891 exit(1);
01892 }
01893 TRACE(FORK, (int)getpid() << ": proofserv env set up");
01894
01895
01896
01897 if (fcp.Post(1, xps->Fileout()) != 0) {
01898 TRACE(XERR, "cannot write log file path to internal pipe; errno: "<<errno);
01899 exit(1);
01900 }
01901 TRACE(FORK, (int)getpid()<< ": log file path communicated");
01902
01903
01904 sigset_t myset;
01905 sigemptyset(&myset);
01906 sigaddset(&myset, SIGUSR1);
01907 sigaddset(&myset, SIGUSR2);
01908 pthread_sigmask(SIG_UNBLOCK, &myset, 0);
01909
01910
01911 fpc.Close();
01912 fcp.Close();
01913
01914 TRACE(FORK, (int)getpid()<<": user: "<<p->Client()->User()<<
01915 ", uid: "<<getuid()<<", euid:"<<geteuid());
01916
01917 execv(xps->ROOT()->PrgmSrv(), argvv);
01918
01919
01920 TRACE(XERR, "returned from execv: bad, bad sign !!!");
01921 exit(1);
01922 }
01923
01924
01925 fForkSem.Post();
01926
01927
01928 if (pid < 0) {
01929 xps->Reset();
01930
01931 response->Send(kXP_ServerError, "could not fork agent");
01932 return 0;
01933 }
01934
01935 TRACEP(p, FORK,"Parent process: child is "<<pid);
01936 XrdOucString emsg;
01937
01938
01939 if (xps->UNIXSock()) {
01940 TRACEP(p, FORK,"current UNIX sock: "<<xps->UNIXSock() <<", path: "<<xps->UNIXSockPath());
01941 xps->DeleteUNIXSock();
01942 }
01943
01944
01945
01946 XrdOucString path, sockpath;
01947 XPDFORM(path, "%s/%s.%s.%d", fActiAdminPath.c_str(),
01948 p->Client()->User(), p->Client()->Group(), pid);
01949
01950 XPDFORM(sockpath, "%s/xpd.%d.%d", fMgr->SockPathDir(), fMgr->Port(), pid);
01951 if (sockpath.length() > 100) {
01952 emsg += ": socket path very long (";
01953 emsg += sockpath.length();
01954 emsg += "): this may lead to stack corruption!";
01955 emsg += " Use xpd.sockpathdir to change it";
01956 }
01957 int pathrc = 0;
01958 if (!pathrc && !(pathrc = xps->SetAdminPath(path.c_str(), 1))) {
01959
01960 if ((pathrc = fpc.Post(0, path.c_str())) != 0)
01961 emsg += ": failed to communicating path to child";
01962 } else {
01963 emsg += ": failed to setup child admin path";
01964
01965 if ((pathrc = fpc.Post(-1, path.c_str())) != 0)
01966 emsg += ": failed communicating failure to child";
01967 }
01968
01969 if (!pathrc) {
01970 xps->SetUNIXSockPath(sockpath.c_str());
01971 if ((pathrc = xps->CreateUNIXSock(fEDest)) != 0) {
01972
01973 emsg += ": failure creating UNIX socket on " ;
01974 emsg += sockpath;
01975 }
01976 }
01977 if (!pathrc) {
01978 TRACEP(p, FORK,"UNIX sock: "<<xps->UNIXSockPath());
01979 if ((pathrc = chown(sockpath.c_str(), p->Client()->UI().fUid, p->Client()->UI().fGid)) != 0) {
01980 emsg += ": failure changing ownership of the UNIX socket on " ;
01981 emsg += sockpath;
01982 emsg += "; errno: " ;
01983 emsg += errno;
01984 }
01985 }
01986
01987 if (!pathrc) {
01988
01989 if ((pathrc = fpc.Post(0, sockpath.c_str())) != 0)
01990 emsg += ": failed to communicating path to child";
01991 } else {
01992 emsg += ": failed to setup child admin path";
01993
01994 if ((pathrc = fpc.Post(-1, sockpath.c_str())) != 0)
01995 emsg += ": failed communicating failure to child";
01996 }
01997 if (pathrc != 0) {
01998
01999 xps->Reset();
02000 XrdProofdAux::KillProcess(pid, 1, p->Client()->UI(), fMgr->ChangeOwn());
02001 response->Send(kXP_ServerError, emsg.c_str());
02002 return 0;
02003 }
02004
02005 TRACEP(p, FORK, "waiting for client setup status ...");
02006
02007 emsg = "proofserv setup";
02008
02009
02010
02011 int ntry = 10, prc = 0, rst = -1;
02012 while (prc == 0 && ntry--) {
02013
02014 if ((prc = fcp.Poll(2)) > 0) {
02015
02016 XpdMsg xmsg;
02017 if (fcp.Recv(xmsg) != 0) {
02018 emsg += ": error receiving message from pipe";
02019 prc = -1;
02020 break;
02021 }
02022
02023 rst = xmsg.Type();
02024
02025 XrdOucString xbuf = xmsg.Buf();
02026 if (xbuf.length() <= 0) {
02027 emsg = "error reading buffer {logfile, error message} from message received on the pipe";
02028 prc = -1;
02029 break;
02030 }
02031 if (rst > 0) {
02032
02033 xps->SetFileout(xbuf.c_str());
02034
02035 XrdOucString stag(xbuf);
02036 stag.erase(stag.rfind('/'));
02037 stag.erase(0, stag.find("session-") + strlen("session-"));
02038 xps->SetTag(stag.c_str());
02039
02040 } else {
02041
02042 prc = -1;
02043 emsg += ": failed: ";
02044 emsg += xbuf;
02045 break;
02046 }
02047
02048 } else if (prc < 0) {
02049 emsg += ": error receive status-of-setup from pipe";
02050 break;
02051 } else {
02052 TRACEP(p, FORK, "receiving status-of-setup from pipe: waiting 2 s ..."<<pid);
02053 }
02054 }
02055
02056
02057 fpc.Close();
02058 fcp.Close();
02059
02060
02061 if (prc <= 0) {
02062
02063 if (prc == 0) emsg += ": timed-out receiving status-of-setup from pipe";
02064 emsg += ": failure setting up proofserv" ;
02065 xps->Reset();
02066 XrdProofdAux::KillProcess(pid, 1, p->Client()->UI(), fMgr->ChangeOwn());
02067 response->Send(kXP_ServerError, emsg.c_str());
02068 return 0;
02069
02070 } else {
02071
02072 if (p->ConnType() == kXPD_ClientMaster) {
02073
02074 XrdOucString dpu = fMgr->PoolURL();
02075 if (!dpu.endswith('/'))
02076 dpu += '/';
02077 dpu += fMgr->NameSpace();
02078 response->SendI(psid, xps->ROOT()->SrvProtVers(), (kXR_int16)XPROOFD_VERSBIN,
02079 (void *) dpu.c_str(), dpu.length());
02080 } else
02081 response->SendI(psid, xps->ROOT()->SrvProtVers(), (kXR_int16)XPROOFD_VERSBIN);
02082 }
02083
02084
02085 TRACEP(p, FORK, "server launched: wait for callback ");
02086
02087
02088 xps->SetSrvPID(pid);
02089
02090
02091 if (Accept(xps, intwait, emsg) != 0) {
02092
02093 if (XrdProofdAux::KillProcess(pid, 0, p->Client()->UI(), fMgr->ChangeOwn()) != 0)
02094 emsg += ": process could not be killed";
02095 else
02096 emsg += ": process killed";
02097
02098 xps->Reset();
02099
02100 TRACEP(p, XERR, "problems accepting callback: " <<emsg);
02101 response->Send(kXR_attn, kXPD_errmsg, (char *) emsg.c_str(), emsg.length());
02102 return 0;
02103 }
02104
02105 xps->SetGroup(p->Client()->Group());
02106
02107
02108 int dp = 0;
02109 if (fMgr->PriorityMgr()->SetProcessPriority(xps->SrvPID(),
02110 p->Client()->User(), dp) != 0) {
02111 TRACEP(p, XERR, "problems changing child process priority");
02112 } else if (dp > 0) {
02113 TRACEP(p, DBG, "priority of the child process changed by " << dp << " units");
02114 }
02115
02116 XrdClientID *cid = xps->Parent();
02117 TRACEP(p, FORK, "xps: "<<xps<<", ClientID: "<<(int *)cid<<" (sid: "<<sid<<")"<<" NClients: "<<xps->GetNClients(1));
02118
02119
02120 if (p->Client()->Sandbox()->AddSession(xps->Tag()) == -1)
02121 TRACEP(p, REQ, "problems recording session in sandbox");
02122
02123
02124 mcGuard.Set(0);
02125
02126
02127 XrdOucString key; key += pid;
02128 { XrdSysMutexHelper mh(fMutex);
02129 fSessions.Add(key.c_str(), xps, 0, Hash_keepdata);
02130 fActiveSessions.push_back(xps);
02131 }
02132 AddSession(p, xps);
02133
02134
02135 if (!xps->IsValid()) {
02136
02137 TRACEP(p, XERR, "PROOF session is invalid: protocol error? " <<emsg);
02138 }
02139
02140
02141 return 0;
02142 }
02143
02144
02145 int XrdProofdProofServMgr::ResolveSession(const char *fpid)
02146 {
02147
02148 XPDLOC(SMGR, "ProofServMgr::ResolveSession")
02149
02150 TRACE(REQ, "resolving "<< fpid<<" ...");
02151
02152
02153 if (!fpid || strlen(fpid)<= 0 || !(fMgr->ClientMgr()) || !fRecoverClients) {
02154 TRACE(XERR, "invalid inputs: "<<fpid<<", "<<fMgr->ClientMgr()<<
02155 ", "<<fRecoverClients);
02156 return -1;
02157 }
02158
02159
02160 XrdOucString path;
02161 XPDFORM(path, "%s/%s", fActiAdminPath.c_str(), fpid);
02162
02163
02164 XrdProofSessionInfo si(path.c_str());
02165
02166
02167 if (si.fSrvProtVers < 18) {
02168 TRACE(DBG, "session does not support recovering: protocol "
02169 <<si.fSrvProtVers<<" < 18");
02170 return -1;
02171 }
02172
02173
02174 XrdProofdClient *c = fMgr->ClientMgr()->GetClient(si.fUser.c_str(), si.fGroup.c_str(),
02175 si.fUnixPath.c_str());
02176 if (!c) {
02177 TRACE(DBG, "client instance not initialized");
02178 return -1;
02179 }
02180
02181
02182 int psid = si.fID;
02183 XrdProofdProofServ *xps = c->GetServObj(psid);
02184 if (!xps) {
02185 TRACE(DBG, "server object not initialized");
02186 return -1;
02187 }
02188
02189
02190 si.FillProofServ(*xps, fMgr->ROOTMgr());
02191 if (xps->CreateUNIXSock(fEDest) != 0) {
02192
02193 TRACE(XERR,"failure creating UNIX socket on " << xps->UNIXSockPath());
02194 xps->Reset();
02195 return -1;
02196 }
02197
02198
02199 xps->SetValid(0);
02200
02201
02202 XrdSysMutexHelper mhp(fRecoverMutex);
02203 std::list<XpdClientSessions *>::iterator ii = fRecoverClients->begin();
02204 while (ii != fRecoverClients->end()) {
02205 if ((*ii)->fClient == c)
02206 break;
02207 ii++;
02208 }
02209 if (ii != fRecoverClients->end()) {
02210 (*ii)->fProofServs.push_back(xps);
02211 } else {
02212 XpdClientSessions *cl = new XpdClientSessions(c);
02213 cl->fProofServs.push_back(xps);
02214 fRecoverClients->push_back(cl);
02215 }
02216
02217
02218 return 0;
02219 }
02220
02221
02222 int XrdProofdProofServMgr::Recover(XpdClientSessions *cl)
02223 {
02224
02225 XPDLOC(SMGR, "ProofServMgr::Recover")
02226
02227 if (!cl) {
02228 TRACE(XERR, "invalid input!");
02229 return 0;
02230 }
02231
02232 TRACE(DBG, "client: "<< cl->fClient->User());
02233
02234 int nr = 0;
02235 XrdOucString emsg;
02236 XrdProofdProofServ *xps = 0;
02237 int nps = 0, npsref = 0;
02238 { XrdSysMutexHelper mhp(cl->fMutex); nps = cl->fProofServs.size(), npsref = nps; }
02239 while (nps--) {
02240
02241 { XrdSysMutexHelper mhp(cl->fMutex); xps = cl->fProofServs.front();
02242 cl->fProofServs.remove(xps); cl->fProofServs.push_back(xps); }
02243
02244
02245 if (Accept(xps, 1, emsg) != 0) {
02246 if (emsg == "timeout") {
02247 TRACE(DBG, "timeout while accepting callback");
02248 } else {
02249 TRACE(XERR, "problems accepting callback: "<<emsg);
02250 }
02251 } else {
02252
02253 XrdOucString key; key += xps->SrvPID();
02254 fSessions.Add(key.c_str(), xps, 0, Hash_keepdata);
02255 fActiveSessions.push_back(xps);
02256 xps->Protocol()->SetAdminPath(xps->AdminPath());
02257
02258 { XrdSysMutexHelper mhp(cl->fMutex); cl->fProofServs.remove(xps); }
02259
02260 nr++;
02261
02262 if (TRACING(REQ)) {
02263 int pid = xps->SrvPID();
02264 int left = -1;
02265 { XrdSysMutexHelper mhp(cl->fMutex); left = cl->fProofServs.size(); }
02266 XPDPRT("session for "<<cl->fClient->User()<<"."<<cl->fClient->Group()<<
02267 " successfully recovered ("<<left<<" left); pid: "<<pid);
02268 }
02269 }
02270 }
02271
02272
02273 return nr;
02274 }
02275
02276
02277 int XrdProofdProofServMgr::Accept(XrdProofdProofServ *xps,
02278 int to, XrdOucString &msg)
02279 {
02280
02281
02282 XPDLOC(SMGR, "ProofServMgr::Accept")
02283
02284
02285 XrdNetPeer peerpsrv;
02286 XrdLink *linkpsrv = 0;
02287 XrdProtocol *xp = 0;
02288 int lnkopts = 0;
02289 bool go = 1;
02290
02291
02292 if (!xps || !xps->UNIXSock()) {
02293 TRACE(XERR, "session pointer undefined or socket invalid: "<<xps);
02294 return -1;
02295 }
02296 TRACE(REQ, "waiting for server callback for "<<to<<" secs ... on "<<xps->UNIXSockPath());
02297
02298
02299 if (go && !(xps->UNIXSock()->Accept(peerpsrv, XRDNET_NODNTRIM, to))) {
02300 msg = "timeout";
02301 go = 0;
02302 }
02303 if (go) {
02304
02305 if (peerpsrv.InetName) {
02306 char *ptmp = peerpsrv.InetName;
02307 peerpsrv.InetName = XrdNetDNS::getHostName("localhost");
02308 free(ptmp);
02309 }
02310 }
02311
02312
02313 if (go && !(linkpsrv = XrdLink::Alloc(peerpsrv, lnkopts))) {
02314 msg = "could not allocate network object: ";
02315 go = 0;
02316 }
02317
02318 if (go) {
02319
02320 peerpsrv.InetBuff = 0;
02321 TRACE(DBG, "accepted connection from " << peerpsrv.InetName);
02322
02323 XrdProofdProtocol *p = new XrdProofdProtocol();
02324 if (!(xp = p->Match(linkpsrv))) {
02325 msg = "match failed: protocol error: ";
02326 go = 0;
02327 }
02328 delete p;
02329 }
02330
02331 if (go) {
02332
02333 XrdOucString apath(xps->AdminPath());
02334 apath += ".status";
02335 ((XrdProofdProtocol *)xp)->SetAdminPath(apath.c_str());
02336
02337 if (xp->Process(linkpsrv) != 0) {
02338 msg = "handshake with internal link failed: ";
02339 go = 0;
02340 }
02341 }
02342
02343
02344 if (go && !XrdPoll::Attach(linkpsrv)) {
02345 msg = "could not attach new internal link to poller: ";
02346 go = 0;
02347 }
02348
02349 if (!go) {
02350
02351 if (linkpsrv)
02352 linkpsrv->Close();
02353 return -1;
02354 }
02355
02356
02357 linkpsrv->setProtocol(xp);
02358
02359 TRACE(REQ, "Protocol "<<xp<<" attached to link "<<linkpsrv<<" ("<< peerpsrv.InetName <<")");
02360
02361
02362 fMgr->Sched()->Schedule((XrdJob *)linkpsrv);
02363
02364
02365 xps->SetProtocol((XrdProofdProtocol *)xp);
02366
02367
02368 return 0;
02369 }
02370
02371
02372 int XrdProofdProofServMgr::Detach(XrdProofdProtocol *p)
02373 {
02374
02375 XPDLOC(SMGR, "ProofServMgr::Detach")
02376
02377 int psid = -1, rc = 0;
02378 XPD_SETRESP(p, "Detach");
02379
02380
02381 psid = ntohl(p->Request()->proof.sid);
02382 TRACEP(p, REQ, "psid: "<<psid);
02383
02384
02385 XrdProofdProofServ *xps = 0;
02386 if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
02387 TRACEP(p, XERR, "session ID not found: "<<psid);
02388 response->Send(kXR_InvalidRequest,"session ID not found");
02389 return 0;
02390 }
02391 xps->FreeClientID(p->Pid());
02392
02393
02394 response->Send();
02395
02396 return 0;
02397 }
02398
02399
02400 int XrdProofdProofServMgr::Destroy(XrdProofdProtocol *p)
02401 {
02402
02403 XPDLOC(SMGR, "ProofServMgr::Destroy")
02404
02405 int psid = -1, rc = 0;
02406 XPD_SETRESP(p, "Destroy");
02407
02408
02409 psid = ntohl(p->Request()->proof.sid);
02410 TRACEP(p, REQ, "psid: "<<psid);
02411
02412 XrdOucString msg;
02413
02414
02415 XrdProofdProofServ *xpsref = 0;
02416 if (psid > -1) {
02417
02418 if (!p->Client() || !(xpsref = p->Client()->GetServer(psid))) {
02419 TRACEP(p, XERR, "reference session ID not found");
02420 response->Send(kXR_InvalidRequest,"reference session ID not found");
02421 return 0;
02422 }
02423 XPDFORM(msg, "session %d destroyed by %s", xpsref->SrvPID(), p->Link()->ID);
02424 } else {
02425 XPDFORM(msg, "all sessions destroyed by %s", p->Link()->ID);
02426 }
02427
02428
02429 p->Client()->TerminateSessions(kXPD_AnyServer, xpsref,
02430 msg.c_str(), Pipe(), fMgr->ChangeOwn());
02431
02432
02433 response->Send();
02434
02435
02436 return 0;
02437 }
02438
02439
02440 static int WriteSessEnvs(const char *, XpdEnv *env, void *s)
02441 {
02442
02443 XPDLOC(SMGR, "WriteSessEnvs")
02444
02445 XrdOucString emsg;
02446
02447 XpdWriteEnv_t *xwe = (XpdWriteEnv_t *)s;
02448
02449 if (env && xwe && xwe->fMgr && xwe->fClient && xwe->fEnv) {
02450 if (env->fEnv.length() > 0) {
02451
02452 xwe->fMgr->ResolveKeywords(env->fEnv, xwe->fClient);
02453
02454 char *ev = new char[env->fEnv.length()+1];
02455 strncpy(ev, env->fEnv.c_str(), env->fEnv.length());
02456 ev[env->fEnv.length()] = 0;
02457 putenv(ev);
02458 fprintf(xwe->fEnv, "%s\n", ev);
02459 TRACE(DBG, ev);
02460 }
02461
02462 return 0;
02463 } else {
02464 emsg = "some input undefined";
02465 }
02466
02467
02468 TRACE(XERR,"protocol error: "<<emsg);
02469 return 1;
02470 }
02471
02472
02473 int XrdProofdProofServMgr::SetProofServEnvOld(XrdProofdProtocol *p, void *input)
02474 {
02475
02476
02477 XPDLOC(SMGR, "ProofServMgr::SetProofServEnvOld")
02478
02479 char *ev = 0;
02480
02481
02482 if (!p || !p->Client() || !input) {
02483 TRACE(XERR, "at leat one input is invalid - cannot continue");
02484 return -1;
02485 }
02486
02487
02488 if (SetProofServEnv(fMgr, p->Client()->ROOT()) != 0) {
02489 TRACE(XERR, "problems setting basic environment - exit");
02490 return -1;
02491 }
02492
02493 ProofServEnv_t *in = (ProofServEnv_t *)input;
02494
02495
02496 XrdProofdProofServ *xps = in->fPS;
02497 if (!xps) {
02498 TRACE(XERR, "unable to get instance of proofserv proxy");
02499 return -1;
02500 }
02501 int psid = xps->ID();
02502 TRACE(REQ, "psid: "<<psid<<", log: "<<in->fLogLevel);
02503
02504
02505 XrdOucString udir = p->Client()->Sandbox()->Dir();
02506 TRACE(DBG, "working dir for "<<p->Client()->User()<<" is: "<<udir);
02507
02508 ev = new char[strlen("ROOTPROOFSESSDIR=") + in->fWrkDir.length() + 2];
02509 sprintf(ev, "ROOTPROOFSESSDIR=%s", in->fWrkDir.c_str());
02510 putenv(ev);
02511 TRACE(DBG, ev);
02512
02513
02514 ev = new char[strlen("ROOTPROOFLOGLEVEL=")+5];
02515 sprintf(ev, "ROOTPROOFLOGLEVEL=%d", in->fLogLevel);
02516 putenv(ev);
02517 TRACE(DBG, ev);
02518
02519
02520 ev = new char[strlen("ROOTPROOFORDINAL=")+strlen(xps->Ordinal())+2];
02521 sprintf(ev, "ROOTPROOFORDINAL=%s", xps->Ordinal());
02522 putenv(ev);
02523 TRACE(DBG, ev);
02524
02525
02526 ev = new char[strlen("ROOTVERSIONTAG=")+strlen(p->Client()->ROOT()->Tag())+2];
02527 sprintf(ev, "ROOTVERSIONTAG=%s", p->Client()->ROOT()->Tag());
02528 putenv(ev);
02529 TRACE(DBG, ev);
02530
02531
02532 TRACE(DBG, "creating env file");
02533 XrdOucString envfile = in->fWrkDir;
02534 envfile += ".env";
02535 FILE *fenv = fopen(envfile.c_str(), "w");
02536 if (!fenv) {
02537 TRACE(XERR,
02538 "unable to open env file: "<<envfile);
02539 return -1;
02540 }
02541 TRACE(DBG, "environment file: "<< envfile);
02542
02543
02544 if (p->AuthProt()) {
02545
02546
02547 XrdOucString secenvs(getenv("XrdSecENVS"));
02548 if (secenvs.length() > 0) {
02549
02550 XrdOucString env;
02551 int from = 0;
02552 while ((from = secenvs.tokenize(env, from, ',')) != -1) {
02553 if (env.length() > 0) {
02554
02555 ev = new char[env.length()+1];
02556 strncpy(ev, env.c_str(), env.length());
02557 ev[env.length()] = 0;
02558 putenv(ev);
02559 fprintf(fenv, "%s\n", ev);
02560 TRACE(DBG, ev);
02561 }
02562 }
02563 }
02564
02565
02566 XrdSecCredentials *creds = p->AuthProt()->getCredentials();
02567 if (creds) {
02568 int lev = strlen("XrdSecCREDS=")+creds->size;
02569 ev = new char[lev+1];
02570 strcpy(ev, "XrdSecCREDS=");
02571 memcpy(ev+strlen("XrdSecCREDS="), creds->buffer, creds->size);
02572 ev[lev] = 0;
02573 putenv(ev);
02574 TRACE(DBG, "XrdSecCREDS set");
02575
02576
02577 if (!strncmp(p->AuthProt()->Entity.prot, "pwd", 3)) {
02578 XrdOucString credsdir = udir;
02579 credsdir += "/.creds";
02580
02581 if (!XrdProofdAux::AssertDir(credsdir.c_str(), p->Client()->UI(), fMgr->ChangeOwn())) {
02582 if (SaveAFSkey(creds, credsdir.c_str(), p->Client()->UI()) == 0) {
02583 ev = new char[strlen("ROOTPROOFAFSCREDS=")+credsdir.length()+strlen("/.afs")+2];
02584 sprintf(ev, "ROOTPROOFAFSCREDS=%s/.afs", credsdir.c_str());
02585 putenv(ev);
02586 fprintf(fenv, "ROOTPROOFAFSCREDS has been set\n");
02587 TRACE(DBG, ev);
02588 } else {
02589 TRACE(DBG, "problems in saving AFS key");
02590 }
02591 } else {
02592 TRACE(XERR, "unable to create creds dir: "<<credsdir);
02593 return -1;
02594 }
02595 }
02596 }
02597 }
02598
02599
02600 fprintf(fenv, "ROOTSYS=%s\n", xps->ROOT()->Dir());
02601
02602
02603 fprintf(fenv, "ROOTCONFDIR=%s\n", xps->ROOT()->Dir());
02604
02605
02606 fprintf(fenv, "ROOTTMPDIR=%s\n", fMgr->TMPdir());
02607
02608
02609 fprintf(fenv, "ROOTXPDPORT=%d\n", fMgr->Port());
02610
02611
02612 fprintf(fenv, "ROOTPROOFWORKDIR=%s\n", udir.c_str());
02613
02614
02615 fprintf(fenv, "ROOTPROOFSESSIONTAG=%s\n", in->fSessionTag.c_str());
02616
02617
02618 if (fMgr->NetMgr()->WorkerUsrCfg())
02619 fprintf(fenv, "ROOTUSEUSERCFG=1\n");
02620
02621
02622 fprintf(fenv, "ROOTOPENSOCK=%s\n", xps->UNIXSockPath());
02623
02624
02625 fprintf(fenv, "ROOTENTITY=%s@%s\n", p->Client()->User(), p->Link()->Host());
02626
02627
02628 fprintf(fenv, "ROOTSESSIONID=%d\n", psid);
02629
02630
02631 fprintf(fenv, "ROOTCLIENTID=%d\n", p->CID());
02632
02633
02634 fprintf(fenv, "ROOTPROOFCLNTVERS=%d\n", p->ProofProtocol());
02635
02636
02637 fprintf(fenv, "ROOTPROOFORDINAL=%s\n", xps->Ordinal());
02638
02639
02640 if (getenv("ROOTVERSIONTAG"))
02641 fprintf(fenv, "ROOTVERSIONTAG=%s\n", getenv("ROOTVERSIONTAG"));
02642
02643
02644 if (in->fCfg.length() > 0)
02645 fprintf(fenv, "ROOTPROOFCFGFILE=%s\n", in->fCfg.c_str());
02646
02647
02648 fprintf(fenv, "ROOTPROOFLOGFILE=%s\n", in->fLogFile.c_str());
02649 xps->SetFileout(in->fLogFile.c_str());
02650
02651
02652 { XrdSysMutexHelper mhp(fEnvsMutex);
02653 if (fProofServEnvs.size() > 0) {
02654
02655 XrdOucHash<XpdEnv> sessenvs;
02656 std::list<XpdEnv>::iterator ienvs = fProofServEnvs.begin();
02657 for ( ; ienvs != fProofServEnvs.end(); ienvs++) {
02658 int envmatch = (*ienvs).Matches(p->Client()->User(), p->Client()->Group(),
02659 p->Client()->ROOT()->SvnRevision(),
02660 p->Client()->ROOT()->VersionCode());
02661 if (envmatch >= 0) {
02662 XpdEnv *env = sessenvs.Find((*ienvs).fName.c_str());
02663 if (env) {
02664 int envmtcex = env->Matches(p->Client()->User(), p->Client()->Group(),
02665 p->Client()->ROOT()->SvnRevision(),
02666 p->Client()->ROOT()->VersionCode());
02667 if (envmatch > envmtcex) {
02668
02669 env = &(*ienvs);
02670 sessenvs.Rep(env->fName.c_str(), env, 0, Hash_keepdata);
02671 }
02672 } else {
02673
02674 env = &(*ienvs);
02675 sessenvs.Add(env->fName.c_str(), env, 0, Hash_keepdata);
02676 }
02677 TRACE(HDBG, "Adding: "<<(*ienvs).fEnv);
02678 }
02679 }
02680 XpdWriteEnv_t xpwe = {fMgr, p->Client(), fenv};
02681 sessenvs.Apply(WriteSessEnvs, (void *)&xpwe);
02682 }
02683 }
02684
02685
02686 if (xps->UserEnvs() &&
02687 strlen(xps->UserEnvs()) && strstr(xps->UserEnvs(),"=")) {
02688
02689 XrdOucString ue = xps->UserEnvs();
02690 XrdOucString env, namelist;
02691 int from = 0, ieq = -1;
02692 while ((from = ue.tokenize(env, from, ',')) != -1) {
02693 if (env.length() > 0 && (ieq = env.find('=')) != -1) {
02694
02695 ResolveKeywords(env, in);
02696 ev = new char[env.length()+1];
02697 strncpy(ev, env.c_str(), env.length());
02698 ev[env.length()] = 0;
02699 putenv(ev);
02700 fprintf(fenv, "%s\n", ev);
02701 TRACE(DBG, ev);
02702 env.erase(ieq);
02703 if (namelist.length() > 0)
02704 namelist += ',';
02705 namelist += env;
02706 }
02707 }
02708
02709 ev = new char[strlen("PROOF_ALLVARS=") + namelist.length() + 2];
02710 sprintf(ev, "PROOF_ALLVARS=%s", namelist.c_str());
02711 putenv(ev);
02712 fprintf(fenv, "%s\n", ev);
02713 TRACE(DBG, ev);
02714 }
02715
02716
02717 fclose(fenv);
02718
02719
02720 TRACE(DBG, "creating symlink");
02721 XrdOucString syml = udir;
02722 if (p->ConnType() == kXPD_MasterWorker)
02723 syml += "/last-worker-session";
02724 else
02725 syml += "/last-master-session";
02726 if (XrdProofdAux::SymLink(in->fSessionDir.c_str(), syml.c_str()) != 0) {
02727 TRACE(XERR, "problems creating symlink to last session (errno: "<<errno<<")");
02728 }
02729
02730
02731 TRACE(DBG, "done");
02732 return 0;
02733 }
02734
02735
02736 int XrdProofdProofServMgr::SetProofServEnv(XrdProofdManager *mgr, XrdROOT *r)
02737 {
02738
02739 XPDLOC(SMGR, "ProofServMgr::SetProofServEnv")
02740
02741 char *ev = 0;
02742
02743 TRACE(REQ, "ROOT dir: "<< (r ? r->Dir() : "*** undef ***"));
02744
02745 if (r) {
02746 char *libdir = (char *) r->LibDir();
02747 char *ldpath = 0;
02748 if (mgr->BareLibPath() && strlen(mgr->BareLibPath()) > 0) {
02749 ldpath = new char[32 + strlen(libdir) + strlen(mgr->BareLibPath())];
02750 sprintf(ldpath, "%s=%s:%s", XPD_LIBPATH, libdir, mgr->BareLibPath());
02751 } else {
02752 ldpath = new char[32 + strlen(libdir)];
02753 sprintf(ldpath, "%s=%s", XPD_LIBPATH, libdir);
02754 }
02755 putenv(ldpath);
02756
02757 char *rootsys = (char *) r->Dir();
02758 ev = new char[15 + strlen(rootsys)];
02759 sprintf(ev, "ROOTSYS=%s", rootsys);
02760 putenv(ev);
02761
02762
02763 char *bindir = (char *) r->BinDir();
02764 ev = new char[15 + strlen(bindir)];
02765 sprintf(ev, "ROOTBINDIR=%s", bindir);
02766 putenv(ev);
02767
02768
02769 char *confdir = (char *) r->DataDir();
02770 ev = new char[20 + strlen(confdir)];
02771 sprintf(ev, "ROOTCONFDIR=%s", confdir);
02772 putenv(ev);
02773
02774
02775 ev = new char[20 + strlen(mgr->TMPdir())];
02776 sprintf(ev, "TMPDIR=%s", mgr->TMPdir());
02777 putenv(ev);
02778
02779
02780 return 0;
02781 }
02782
02783
02784 TRACE(XERR, "XrdROOT instance undefined!");
02785 return -1;
02786 }
02787
02788
02789 void XrdProofdProofServMgr::GetTagDirs(XrdProofdProtocol *p, XrdProofdProofServ *xps,
02790 XrdOucString &sesstag, XrdOucString &topsesstag,
02791 XrdOucString &sessiondir, XrdOucString &sesswrkdir)
02792 {
02793
02794
02795
02796 XrdOucString udir = p->Client()->Sandbox()->Dir();
02797
02798
02799 XrdOucString host = fMgr->Host();
02800 if (host.find(".") != STR_NPOS)
02801 host.erase(host.find("."));
02802 XPDFORM(sesstag, "%s-%d-%d", host.c_str(), (int)time(0), (int)getpid());
02803
02804
02805 topsesstag = sesstag;
02806 sessiondir = udir;
02807 if (p->ConnType() == kXPD_ClientMaster) {
02808 sessiondir += "/session-";
02809 sessiondir += sesstag;
02810 xps->SetTag(sesstag.c_str());
02811 } else {
02812 sessiondir += "/";
02813 sessiondir += xps->Tag();
02814 topsesstag = xps->Tag();
02815 topsesstag.replace("session-","");
02816 }
02817
02818
02819 if (XrdProofdAux::AssertDir(sessiondir.c_str(), p->Client()->UI(),
02820 fMgr->ChangeOwn()) == -1) {
02821 return;
02822 }
02823
02824
02825 sesswrkdir = sessiondir;
02826 if (p->ConnType() == kXPD_MasterWorker) {
02827 XPDFORM(sesswrkdir, "%s/worker-%s-%s", sessiondir.c_str(), xps->Ordinal(), sesstag.c_str());
02828 } else {
02829 XPDFORM(sesswrkdir, "%s/master-%s-%s", sessiondir.c_str(), xps->Ordinal(), sesstag.c_str());
02830 }
02831
02832
02833 return;
02834 }
02835
02836
02837 static int WriteSessRCs(const char *, XpdEnv *erc, void *f)
02838 {
02839
02840 XPDLOC(SMGR, "WriteSessRCs")
02841
02842 XrdOucString emsg;
02843 FILE *frc = (FILE *)f;
02844 if (frc && erc) {
02845 XrdOucString rc = erc->fEnv;
02846 if (rc.length() > 0) {
02847 if (rc.find("Proof.DataSetManager") != STR_NPOS) {
02848 TRACE(ALL,"Proof.DataSetManager ignored: use xpd.datasetsrc to define dataset managers");
02849 } else {
02850 fprintf(frc, "%s\n", rc.c_str());
02851 }
02852 }
02853
02854 return 0;
02855 } else {
02856 emsg = "file or input entry undefined";
02857 }
02858
02859
02860 TRACE(XERR,"protocol error: "<<emsg);
02861 return 1;
02862 }
02863
02864
02865 int XrdProofdProofServMgr::SetProofServEnv(XrdProofdProtocol *p, void *input)
02866 {
02867
02868 XPDLOC(SMGR, "ProofServMgr::SetProofServEnv")
02869
02870 char *ev = 0;
02871
02872
02873 if (!p || !p->Client() || !input) {
02874 TRACE(XERR, "at leat one input is invalid - cannot continue");
02875 return -1;
02876 }
02877
02878
02879 int rootvers = p->Client()->ROOT() ? p->Client()->ROOT()->SrvProtVers() : -1;
02880 TRACE(DBG, "rootvers: "<< rootvers);
02881 if (rootvers < 14 && rootvers > -1)
02882 return SetProofServEnvOld(p, input);
02883
02884 ProofServEnv_t *in = (ProofServEnv_t *)input;
02885
02886
02887 XrdProofdProofServ *xps = in->fPS;
02888 if (!xps) {
02889 TRACE(XERR, "unable to get instance of proofserv proxy");
02890 return -1;
02891 }
02892 int psid = xps->ID();
02893 TRACE(REQ, "psid: "<<psid<<", log: "<<in->fLogLevel);
02894
02895
02896 XrdOucString udir = p->Client()->Sandbox()->Dir();
02897 TRACE(DBG, "sandbox for "<<p->Client()->User()<<" is: "<<udir);
02898 TRACE(DBG, "session unique tag "<<in->fSessionTag);
02899 TRACE(DBG, "session dir " << in->fSessionDir);
02900 TRACE(DBG, "session working dir:" << in->fWrkDir);
02901
02902
02903 if (XrdProofdAux::ChangeToDir(in->fSessionDir.c_str(), p->Client()->UI(),
02904 fMgr->ChangeOwn()) != 0) {
02905 TRACE(XERR, "couldn't change directory to " << in->fSessionDir);
02906 return -1;
02907 }
02908
02909
02910 if (SetProofServEnv(fMgr, p->Client()->ROOT()) != 0) {
02911 TRACE(XERR, "problems setting basic environment - exit");
02912 return -1;
02913 }
02914
02915
02916 TRACE(DBG, "creating env file");
02917 XrdOucString rcfile = in->fWrkDir;
02918 rcfile += ".rootrc";
02919 FILE *frc = fopen(rcfile.c_str(), "w");
02920 if (!frc) {
02921 TRACE(XERR, "unable to open rootrc file: "<<rcfile);
02922 return -1;
02923 }
02924
02925 if (XrdProofdAux::SymLink(rcfile.c_str(), "session.rootrc") != 0) {
02926 TRACE(XERR, "problems creating symlink to 'session.rootrc' (errno: "<<errno<<")");
02927 }
02928 TRACE(REQ, "session rootrc file: "<< rcfile);
02929
02930
02931 fprintf(frc, "# XrdProofdProtocol listening port\n");
02932 fprintf(frc, "ProofServ.XpdPort: %d\n", fMgr->Port());
02933
02934
02935 if (fMgr->LocalROOT() && strlen(fMgr->LocalROOT()) > 0) {
02936 fprintf(frc, "# Prefix to be prepended to local paths\n");
02937 fprintf(frc, "Path.Localroot: %s\n", fMgr->LocalROOT());
02938 }
02939
02940
02941 if (fMgr->PoolURL() && strlen(fMgr->PoolURL()) > 0) {
02942 XrdOucString purl(fMgr->PoolURL());
02943 if (!purl.endswith("/"))
02944 purl += "/";
02945 fprintf(frc, "# URL for the data pool entry-point\n");
02946 fprintf(frc, "ProofServ.PoolUrl: %s\n", purl.c_str());
02947 }
02948
02949
02950 fprintf(frc, "# The session working dir\n");
02951 fprintf(frc, "ProofServ.SessionDir: %s\n", in->fWrkDir.c_str());
02952
02953
02954 fprintf(frc, "# Proof Log/Debug level\n");
02955 fprintf(frc, "Proof.DebugLevel: %d\n", in->fLogLevel);
02956
02957
02958 fprintf(frc, "# Ordinal number\n");
02959 fprintf(frc, "ProofServ.Ordinal: %s\n", xps->Ordinal());
02960
02961
02962 if (p->Client()->ROOT()) {
02963 fprintf(frc, "# ROOT Version tag\n");
02964 fprintf(frc, "ProofServ.RootVersionTag: %s\n", p->Client()->ROOT()->Tag());
02965 }
02966
02967 if (p->Client()->Group()) {
02968 fprintf(frc, "# Proof group\n");
02969 fprintf(frc, "ProofServ.ProofGroup: %s\n", p->Client()->Group());
02970 }
02971
02972
02973 if (fMgr->GroupsMgr() && fMgr->GroupsMgr()->GetCfgFile()) {
02974 fprintf(frc, "# File with group information\n");
02975 fprintf(frc, "Proof.GroupFile: %s\n", fMgr->GroupsMgr()->GetCfgFile());
02976 }
02977
02978
02979 fprintf(frc, "# Users sandbox\n");
02980 fprintf(frc, "ProofServ.Sandbox: %s\n", udir.c_str());
02981
02982
02983 if (fMgr->Image() && strlen(fMgr->Image()) > 0) {
02984 fprintf(frc, "# Server image\n");
02985 fprintf(frc, "ProofServ.Image: %s\n", fMgr->Image());
02986 }
02987
02988
02989 fprintf(frc, "# Session tag\n");
02990 fprintf(frc, "ProofServ.SessionTag: %s\n", in->fTopSessionTag.c_str());
02991
02992
02993 fprintf(frc, "# Session admin path\n");
02994 int proofvrs = (p->Client()->ROOT()) ? p->Client()->ROOT()->SrvProtVers() : -1;
02995 if (proofvrs < 0 || proofvrs < 27) {
02996
02997 fprintf(frc, "ProofServ.AdminPath: %s\n", xps->AdminPath());
02998 } else {
02999
03000 fprintf(frc, "ProofServ.AdminPath: %s.status\n", xps->AdminPath());
03001 }
03002
03003
03004 if (fMgr->NetMgr()->WorkerUsrCfg()) {
03005 fprintf(frc, "# Whether user specific config files are enabled\n");
03006 fprintf(frc, "ProofServ.UseUserCfg: 1\n");
03007 }
03008
03009 fprintf(frc, "# Open socket\n");
03010 fprintf(frc, "ProofServ.OpenSock: %s\n", xps->UNIXSockPath());
03011
03012 fprintf(frc, "# Entity\n");
03013 if (p->Client()->UI().fGroup.length() > 0)
03014 fprintf(frc, "ProofServ.Entity: %s:%s@%s\n",
03015 p->Client()->User(), p->Client()->UI().fGroup.c_str(), p->Link()->Host());
03016 else
03017 fprintf(frc, "ProofServ.Entity: %s@%s\n", p->Client()->User(), p->Link()->Host());
03018
03019
03020
03021 fprintf(frc, "# Session ID\n");
03022 fprintf(frc, "ProofServ.SessionID: %d\n", psid);
03023
03024
03025 fprintf(frc, "# Client ID\n");
03026 fprintf(frc, "ProofServ.ClientID: %d\n", p->CID());
03027
03028
03029 fprintf(frc, "# Client Protocol\n");
03030 fprintf(frc, "ProofServ.ClientVersion: %d\n", p->ProofProtocol());
03031
03032
03033 if (in->fCfg.length() > 0) {
03034 if (in->fCfg == "masteronly") {
03035 fprintf(frc, "# MasterOnly option\n");
03036
03037 fprintf(frc, "Proof.MasterOnly: 1\n");
03038 } else {
03039 fprintf(frc, "# Config file\n");
03040
03041 fprintf(frc, "ProofServ.ProofConfFile: %s\n", in->fCfg.c_str());
03042 }
03043 } else {
03044 fprintf(frc, "# Config file\n");
03045 if (fMgr->IsSuperMst()) {
03046 fprintf(frc, "# Config file\n");
03047 fprintf(frc, "ProofServ.ProofConfFile: sm:\n");
03048 } else if (fProofPlugin.length() > 0) {
03049 fprintf(frc, "# Config file\n");
03050 fprintf(frc, "ProofServ.ProofConfFile: %s\n", fProofPlugin.c_str());
03051 }
03052 }
03053
03054
03055
03056 fprintf(frc, "# Default settings for XrdClient\n");
03057 fprintf(frc, "XNet.FirstConnectMaxCnt 3\n");
03058 fprintf(frc, "XNet.ConnectTimeout 5\n");
03059
03060
03061 int vrscode = (p->Client()->ROOT()) ? p->Client()->ROOT()->VersionCode() : -1;
03062 if (vrscode > 0 && vrscode < XrdROOT::GetVersionCode(5,24,0)) {
03063 fprintf(frc, "# Force remote reading also for local files to avoid a wrong TTreeCache initialization\n");
03064 fprintf(frc, "Path.ForceRemote 1\n");
03065 }
03066
03067
03068 { XrdSysMutexHelper mhp(fEnvsMutex);
03069 if (fProofServRCs.size() > 0) {
03070 fprintf(frc, "# Additional rootrcs (xpd.putrc directives)\n");
03071
03072 XrdOucHash<XpdEnv> sessrcs;
03073 std::list<XpdEnv>::iterator ircs = fProofServRCs.begin();
03074 for ( ; ircs != fProofServRCs.end(); ircs++) {
03075 int rcmatch = (*ircs).Matches(p->Client()->User(), p->Client()->Group(),
03076 p->Client()->ROOT()->SvnRevision(),
03077 p->Client()->ROOT()->VersionCode());
03078 if (rcmatch >= 0) {
03079 XpdEnv *rcenv = sessrcs.Find((*ircs).fName.c_str());
03080 if (rcenv) {
03081 int rcmtcex = rcenv->Matches(p->Client()->User(), p->Client()->Group(),
03082 p->Client()->ROOT()->SvnRevision(),
03083 p->Client()->ROOT()->VersionCode());
03084 if (rcmatch > rcmtcex) {
03085
03086 rcenv = &(*ircs);
03087 sessrcs.Rep(rcenv->fName.c_str(), rcenv, 0, Hash_keepdata);
03088 }
03089 } else {
03090
03091 rcenv = &(*ircs);
03092 sessrcs.Add(rcenv->fName.c_str(), rcenv, 0, Hash_keepdata);
03093 }
03094 TRACE(HDBG, "Adding: "<<(*ircs).fEnv);
03095 }
03096 }
03097 sessrcs.Apply(WriteSessRCs, (void *)frc);
03098 }
03099 }
03100
03101 if (fMgr->DataSetSrcs()->size() > 0) {
03102 fprintf(frc, "# Dataset sources\n");
03103 XrdOucString rc("Proof.DataSetManager: ");
03104 std::list<XrdProofdDSInfo *>::iterator ii;
03105 for (ii = fMgr->DataSetSrcs()->begin(); ii != fMgr->DataSetSrcs()->end(); ii++) {
03106 if (ii != fMgr->DataSetSrcs()->begin()) rc += ", ";
03107 rc += (*ii)->fType;
03108 rc += " dir:";
03109 rc += (*ii)->fUrl;
03110 rc += " opt:";
03111 rc += (*ii)->fOpts;
03112 }
03113 fprintf(frc, "%s\n", rc.c_str());
03114 }
03115
03116
03117 if (fMgr->DataDir() && strlen(fMgr->DataDir()) > 0) {
03118 fprintf(frc, "# Data directory\n");
03119 XrdOucString rc;
03120 XPDFORM(rc, "ProofServ.DataDir: %s/%s/%s/%s/%s", fMgr->DataDir(),
03121 p->Client()->Group(), p->Client()->User(), xps->Ordinal(),
03122 in->fSessionTag.c_str());
03123 fprintf(frc, "%s\n", rc.c_str());
03124 }
03125
03126
03127 fclose(frc);
03128
03129
03130 XrdOucString envfile = in->fWrkDir;
03131 envfile += ".env";
03132 FILE *fenv = fopen(envfile.c_str(), "w");
03133 if (!fenv) {
03134 TRACE(XERR, "unable to open env file: "<<envfile);
03135 return -1;
03136 }
03137 TRACE(REQ, "environment file: "<< envfile);
03138
03139
03140 if (p->AuthProt()) {
03141
03142
03143 XrdOucString secenvs(getenv("XrdSecENVS"));
03144 if (secenvs.length() > 0) {
03145
03146 XrdOucString env;
03147 int from = 0;
03148 while ((from = secenvs.tokenize(env, from, ',')) != -1) {
03149 if (env.length() > 0) {
03150
03151 ev = new char[env.length()+1];
03152 strncpy(ev, env.c_str(), env.length());
03153 ev[env.length()] = 0;
03154 putenv(ev);
03155 fprintf(fenv, "%s\n", ev);
03156 TRACE(DBG, ev);
03157 }
03158 }
03159 }
03160
03161
03162 XrdSecCredentials *creds = p->AuthProt()->getCredentials();
03163 if (creds) {
03164 int lev = strlen("XrdSecCREDS=")+creds->size;
03165 ev = new char[lev+1];
03166 strcpy(ev, "XrdSecCREDS=");
03167 memcpy(ev+strlen("XrdSecCREDS="), creds->buffer, creds->size);
03168 ev[lev] = 0;
03169 putenv(ev);
03170 TRACE(DBG, "XrdSecCREDS set");
03171
03172
03173 if (!strncmp(p->AuthProt()->Entity.prot, "pwd", 3)) {
03174 XrdOucString credsdir = udir;
03175 credsdir += "/.creds";
03176
03177 if (!XrdProofdAux::AssertDir(credsdir.c_str(), p->Client()->UI(), fMgr->ChangeOwn())) {
03178 if (SaveAFSkey(creds, credsdir.c_str(), p->Client()->UI()) == 0) {
03179 ev = new char[strlen("ROOTPROOFAFSCREDS=")+credsdir.length()+strlen("/.afs")+2];
03180 sprintf(ev, "ROOTPROOFAFSCREDS=%s/.afs", credsdir.c_str());
03181 putenv(ev);
03182 fprintf(fenv, "ROOTPROOFAFSCREDS has been set\n");
03183 TRACE(DBG, ev);
03184 } else {
03185 TRACE(DBG, "problems in saving AFS key");
03186 }
03187 } else {
03188 TRACE(XERR, "unable to create creds dir: "<<credsdir);
03189 return -1;
03190 }
03191 }
03192 }
03193 }
03194
03195
03196 fprintf(fenv, "%s=%s\n", XPD_LIBPATH, getenv(XPD_LIBPATH));
03197
03198
03199 fprintf(fenv, "ROOTSYS=%s\n", xps->ROOT()->Dir());
03200
03201
03202 fprintf(fenv, "ROOTCONFDIR=%s\n", xps->ROOT()->Dir());
03203
03204
03205 fprintf(fenv, "TMPDIR=%s\n", fMgr->TMPdir());
03206
03207
03208 ev = new char[strlen("ROOTRCFILE=")+rcfile.length()+2];
03209 sprintf(ev, "ROOTRCFILE=%s", rcfile.c_str());
03210 putenv(ev);
03211 fprintf(fenv, "%s\n", ev);
03212 TRACE(DBG, ev);
03213
03214
03215 ev = new char[strlen("ROOTVERSIONTAG=")+strlen(p->Client()->ROOT()->Tag())+2];
03216 sprintf(ev, "ROOTVERSIONTAG=%s", p->Client()->ROOT()->Tag());
03217 putenv(ev);
03218 fprintf(fenv, "%s\n", ev);
03219 TRACE(DBG, ev);
03220
03221
03222 ev = new char[strlen("ROOTPROOFLOGFILE=") + in->fLogFile.length() + 2];
03223 sprintf(ev, "ROOTPROOFLOGFILE=%s", in->fLogFile.c_str());
03224 putenv(ev);
03225 fprintf(fenv, "%s\n", ev);
03226 xps->SetFileout(in->fLogFile.c_str());
03227 TRACE(DBG, ev);
03228
03229
03230 XrdOucString locdatasrv("root://");
03231 locdatasrv += fMgr->Host();
03232 ev = new char[strlen("LOCALDATASERVER=") + locdatasrv.length() + 2];
03233 sprintf(ev, "LOCALDATASERVER=%s", locdatasrv.c_str());
03234 putenv(ev);
03235 fprintf(fenv, "%s\n", ev);
03236 TRACE(DBG, ev);
03237
03238
03239 if (CfgFile()) {
03240 ev = new char[strlen("XRDCF=")+strlen(CfgFile())+2];
03241 sprintf(ev, "XRDCF=%s", CfgFile());
03242 putenv(ev);
03243 fprintf(fenv, "%s\n", ev);
03244 TRACE(DBG, ev);
03245 }
03246
03247
03248 { XrdSysMutexHelper mhp(fEnvsMutex);
03249 if (fProofServEnvs.size() > 0) {
03250
03251 XrdOucHash<XpdEnv> sessenvs;
03252 std::list<XpdEnv>::iterator ienvs = fProofServEnvs.begin();
03253 for ( ; ienvs != fProofServEnvs.end(); ienvs++) {
03254 int envmatch = (*ienvs).Matches(p->Client()->User(), p->Client()->Group(),
03255 p->Client()->ROOT()->SvnRevision(),
03256 p->Client()->ROOT()->VersionCode());
03257 if (envmatch >= 0) {
03258 XpdEnv *env = sessenvs.Find((*ienvs).fName.c_str());
03259 if (env) {
03260 int envmtcex = env->Matches(p->Client()->User(), p->Client()->Group(),
03261 p->Client()->ROOT()->SvnRevision(),
03262 p->Client()->ROOT()->VersionCode());
03263 if (envmatch > envmtcex) {
03264
03265 env = &(*ienvs);
03266 sessenvs.Rep(env->fName.c_str(), env, 0, Hash_keepdata);
03267 }
03268 } else {
03269
03270 env = &(*ienvs);
03271 sessenvs.Add(env->fName.c_str(), env, 0, Hash_keepdata);
03272 }
03273 TRACE(HDBG, "Adding: "<<(*ienvs).fEnv);
03274 }
03275 }
03276 XpdWriteEnv_t xpwe = {fMgr, p->Client(), fenv};
03277 sessenvs.Apply(WriteSessEnvs, (void *)&xpwe);
03278 }
03279 }
03280
03281 if (xps->UserEnvs() &&
03282 strlen(xps->UserEnvs()) && strstr(xps->UserEnvs(),"=")) {
03283
03284 XrdOucString ue = xps->UserEnvs();
03285 XrdOucString env, namelist;
03286 int from = 0, ieq = -1;
03287 while ((from = ue.tokenize(env, from, ',')) != -1) {
03288 if (env.length() > 0 && (ieq = env.find('=')) != -1) {
03289
03290 ResolveKeywords(env, in);
03291 ev = new char[env.length()+1];
03292 strncpy(ev, env.c_str(), env.length());
03293 ev[env.length()] = 0;
03294 putenv(ev);
03295 fprintf(fenv, "%s\n", ev);
03296 TRACE(DBG, ev);
03297 env.erase(ieq);
03298 if (namelist.length() > 0)
03299 namelist += ',';
03300 namelist += env;
03301 }
03302 }
03303
03304 ev = new char[strlen("PROOF_ALLVARS=") + namelist.length() + 2];
03305 sprintf(ev, "PROOF_ALLVARS=%s", namelist.c_str());
03306 putenv(ev);
03307 fprintf(fenv, "%s\n", ev);
03308 TRACE(DBG, ev);
03309 }
03310
03311
03312 fclose(fenv);
03313
03314
03315 TRACE(REQ, "creating symlink");
03316 XrdOucString syml = udir;
03317 if (p->ConnType() == kXPD_MasterWorker)
03318 syml += "/last-worker-session";
03319 else
03320 syml += "/last-master-session";
03321 if (XrdProofdAux::SymLink(in->fSessionDir.c_str(), syml.c_str()) != 0) {
03322 TRACE(XERR, "problems creating symlink to "
03323 " last session (errno: "<<errno<<")");
03324 }
03325
03326
03327 TRACE(REQ, "done");
03328 return 0;
03329 }
03330
03331
03332 int XrdProofdProofServMgr::CleanupLostProofServ()
03333 {
03334
03335
03336
03337
03338
03339 XPDLOC(SMGR, "ProofServMgr::CleanupLostProofServ")
03340
03341 if (!fCheckLost) {
03342 TRACE(REQ, "disabled ...");
03343 return 0;
03344 }
03345
03346 TRACE(REQ, "checking for orphalin proofserv processes ...");
03347 int nk = 0;
03348
03349
03350 std::map<int,XrdOucString> procs;
03351 if (XrdProofdAux::GetProcesses("proofserv", &procs) <= 0) {
03352 TRACE(DBG, " no proofservs around: nothing to do");
03353 return 0;
03354 }
03355
03356 XrdProofUI ui;
03357 if (XrdProofdAux::GetUserInfo(fMgr->EffectiveUser(), ui) != 0) {
03358 TRACE(DBG, "problems getting info for user " << fMgr->EffectiveUser());
03359 return -1;
03360 }
03361
03362
03363 XrdOucRash<int, int> controlled, xrdproc;
03364
03365
03366 XrdOucHash<XrdOucString> sessionspaths;
03367
03368
03369 int pid, ia, a;
03370 XrdOucString cmd, apath, pidpath, sessiondir, emsg, rest, after;
03371 std::map<int,XrdOucString>::iterator ip;
03372 for (ip = procs.begin(); ip != procs.end(); ip++) {
03373 pid = ip->first;
03374 cmd = ip->second;
03375 if ((ia = cmd.find("xpdpath:")) != STR_NPOS) {
03376 cmd.tokenize(apath, ia, ' ');
03377 apath.replace("xpdpath:", "");
03378 if (apath.length() <= 0) {
03379 TRACE(ALL, "admin path not found; initial cmd line: "<<cmd);
03380 continue;
03381 }
03382
03383 XPDFORM(pidpath, "%s/xrootd.pid", apath.c_str());
03384 int xpid = XrdProofdAux::GetIDFromPath(pidpath.c_str(), emsg);
03385 int *alive = xrdproc.Find(xpid);
03386 if (!alive) {
03387 a = (XrdProofdAux::VerifyProcessByID(xpid, fParentExecs.c_str())) ? 1 : 0;
03388 xrdproc.Add(xpid, a);
03389 if (!(alive = xrdproc.Find(xpid))) {
03390 TRACE(ALL, "unable to add info in the Rash table!");
03391 }
03392 } else {
03393 a = *alive;
03394 }
03395
03396
03397 bool ok = 0;
03398 if (a == 1) {
03399 const char *subdir[2] = {"activesessions", "terminatedsessions"};
03400 for (int i = 0; i < 2; i++) {
03401 XPDFORM(sessiondir, "%s/%s", apath.c_str(), subdir[i]);
03402 if (!sessionspaths.Find(sessiondir.c_str())) {
03403 DIR *sdir = opendir(sessiondir.c_str());
03404 if (!sdir) {
03405 XPDFORM(emsg, "cannot open '%s' - errno: %d", apath.c_str(), errno);
03406 TRACE(XERR, emsg.c_str());
03407 continue;
03408 }
03409 struct dirent *sent = 0;
03410 while ((sent = readdir(sdir))) {
03411 if (!strncmp(sent->d_name, ".", 1) || !strncmp(sent->d_name, "..", 2))
03412 continue;
03413
03414 int ppid = XrdProofdAux::ParsePidPath(sent->d_name, rest, after);
03415
03416 controlled.Add(ppid, ppid);
03417 }
03418 closedir(sdir);
03419 sessionspaths.Add(sessiondir.c_str(), 0, 0, Hash_data_is_key);
03420 }
03421 ok = (controlled.Find(pid)) ? 1 : ok;
03422
03423 if (ok) break;
03424 }
03425 }
03426
03427 if (!ok) {
03428 TRACE(ALL,"process: "<<pid<<" lost its controller: killing");
03429 if (XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn()) == 0)
03430 nk++;
03431 }
03432 }
03433
03434 }
03435
03436
03437 return nk;
03438 }
03439
03440
03441 int XrdProofdProofServMgr::CleanupProofServ(bool all, const char *usr)
03442 {
03443
03444
03445
03446
03447
03448
03449
03450 XPDLOC(SMGR, "ProofServMgr::CleanupProofServ")
03451
03452 TRACE(REQ, "all: "<<all<<", usr: " << (usr ? usr : "undef"));
03453 int nk = 0;
03454
03455
03456 const char *pn = "proofserv";
03457
03458
03459 XrdProofUI ui;
03460 int refuid = -1;
03461 if (!all) {
03462 if (!usr) {
03463 TRACE(DBG, "usr must be defined for all = FALSE");
03464 return -1;
03465 }
03466 if (XrdProofdAux::GetUserInfo(usr, ui) != 0) {
03467 TRACE(DBG, "problems getting info for user " << usr);
03468 return -1;
03469 }
03470 refuid = ui.fUid;
03471 }
03472
03473 #if defined(linux)
03474
03475 DIR *dir = opendir("/proc");
03476 if (!dir) {
03477 XrdOucString emsg("cannot open /proc - errno: ");
03478 emsg += errno;
03479 TRACE(DBG, emsg.c_str());
03480 return -1;
03481 }
03482
03483 struct dirent *ent = 0;
03484 while ((ent = readdir(dir))) {
03485 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
03486 if (DIGIT(ent->d_name[0])) {
03487 XrdOucString fn("/proc/", 256);
03488 fn += ent->d_name;
03489 fn += "/status";
03490
03491 FILE *ffn = fopen(fn.c_str(), "r");
03492 if (!ffn) {
03493 XrdOucString emsg("cannot open file ");
03494 emsg += fn; emsg += " - errno: "; emsg += errno;
03495 TRACE(HDBG, emsg);
03496 continue;
03497 }
03498
03499 bool xname = 1, xpid = 1, xppid = 1;
03500 bool xuid = (all) ? 0 : 1;
03501 int pid = -1;
03502 int ppid = -1;
03503 char line[2048] = { 0 };
03504 while (fgets(line, sizeof(line), ffn) &&
03505 (xname || xpid || xppid || xuid)) {
03506
03507 if (xname && strstr(line, "Name:")) {
03508 if (!strstr(line, pn))
03509 break;
03510 xname = 0;
03511 }
03512 if (xpid && strstr(line, "Pid:")) {
03513 pid = (int) XrdProofdAux::GetLong(&line[strlen("Pid:")]);
03514 xpid = 0;
03515 }
03516 if (xppid && strstr(line, "PPid:")) {
03517 ppid = (int) XrdProofdAux::GetLong(&line[strlen("PPid:")]);
03518
03519 if (ppid != getpid() && XrdProofdAux::VerifyProcessByID(ppid, fParentExecs.c_str()))
03520
03521 break;
03522 xppid = 0;
03523 }
03524 if (xuid && strstr(line, "Uid:")) {
03525 int uid = (int) XrdProofdAux::GetLong(&line[strlen("Uid:")]);
03526 if (refuid == uid)
03527 xuid = 0;
03528 }
03529 }
03530
03531 fclose(ffn);
03532
03533 if (!xname && !xpid && !xppid && !xuid) {
03534
03535 bool muok = 1;
03536 if (fMgr->MultiUser() && !all) {
03537
03538
03539 muok = 0;
03540 XrdProofdProofServ *srv = GetActiveSession(pid);
03541 if (!srv || (srv && !strcmp(usr, srv->Client())))
03542 muok = 1;
03543 }
03544 if (muok)
03545 if (XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn()) == 0)
03546 nk++;
03547 }
03548 }
03549 }
03550
03551 closedir(dir);
03552
03553 #elif defined(__sun)
03554
03555
03556 DIR *dir = opendir("/proc");
03557 if (!dir) {
03558 XrdOucString emsg("cannot open /proc - errno: ");
03559 emsg += errno;
03560 TRACE(DBG, emsg);
03561 return -1;
03562 }
03563
03564 struct dirent *ent = 0;
03565 while ((ent = readdir(dir))) {
03566 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
03567 if (DIGIT(ent->d_name[0])) {
03568 XrdOucString fn("/proc/", 256);
03569 fn += ent->d_name;
03570 fn += "/psinfo";
03571
03572 int ffd = open(fn.c_str(), O_RDONLY);
03573 if (ffd <= 0) {
03574 XrdOucString emsg("cannot open file ");
03575 emsg += fn; emsg += " - errno: "; emsg += errno;
03576 TRACE(HDBG, emsg);
03577 continue;
03578 }
03579
03580 bool xname = 1;
03581 bool xuid = (all) ? 0 : 1;
03582 bool xppid = 1;
03583
03584 psinfo_t psi;
03585 if (read(ffd, &psi, sizeof(psinfo_t)) != sizeof(psinfo_t)) {
03586 XrdOucString emsg("cannot read ");
03587 emsg += fn; emsg += ": errno: "; emsg += errno;
03588 TRACE(XERR, emsg);
03589 close(ffd);
03590 continue;
03591 }
03592
03593 close(ffd);
03594
03595
03596 if (xname) {
03597 if (!strstr(psi.pr_fname, pn))
03598 continue;
03599 xname = 0;
03600 }
03601
03602 if (xuid) {
03603 if (refuid == psi.pr_uid)
03604 xuid = 0;
03605 }
03606
03607 int ppid = psi.pr_ppid;
03608 if (ppid != getpid() && XrdProofdAux::VerifyProcessByID(ppid, fParentExecs.c_str())) {
03609
03610 continue;
03611 xppid = 0;
03612 }
03613
03614
03615 if (!xname && !xppid && !xuid) {
03616 bool muok = 1;
03617 if (fMgr->MultiUser() && !all) {
03618
03619
03620 muok = 0;
03621 XrdProofdProofServ *srv = GetActiveSession(psi.pr_pid);
03622 if (!srv || (srv && !strcmp(usr, srv->Client())))
03623 muok = 1;
03624 }
03625 if (muok)
03626 if (XrdProofdAux::KillProcess(psi.pr_pid, 1, ui, fMgr->ChangeOwn()) == 0)
03627 nk++;
03628 }
03629 }
03630 }
03631
03632 closedir(dir);
03633
03634 #elif defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__APPLE__)
03635
03636
03637 kinfo_proc *pl = 0;
03638 int np;
03639 int ern = 0;
03640 if ((ern = XrdProofdAux::GetMacProcList(&pl, np)) != 0) {
03641 XrdOucString emsg("cannot get the process list: errno: ");
03642 emsg += ern;
03643 TRACE(XERR, emsg);
03644 return -1;
03645 }
03646
03647
03648 int ii = np;
03649 while (ii--) {
03650 if (strstr(pl[ii].kp_proc.p_comm, pn)) {
03651 if (all || (int)(pl[ii].kp_eproc.e_ucred.cr_uid) == refuid) {
03652
03653 int ppid = pl[ii].kp_eproc.e_ppid;
03654 bool xppid = 0;
03655 if (ppid != getpid()) {
03656 int jj = np;
03657 while (jj--) {
03658 if (strstr(pl[jj].kp_proc.p_comm, "xrootd") &&
03659 pl[jj].kp_proc.p_pid == ppid) {
03660 xppid = 1;
03661 break;
03662 }
03663 }
03664 }
03665 if (!xppid) {
03666 bool muok = 1;
03667 if (fMgr->MultiUser() && !all) {
03668
03669
03670 muok = 0;
03671 XrdProofdProofServ *srv = GetActiveSession(pl[np].kp_proc.p_pid);
03672 if (!srv || (srv && !strcmp(usr, srv->Client())))
03673 muok = 1;
03674 }
03675 if (muok)
03676
03677 if (XrdProofdAux::KillProcess(pl[np].kp_proc.p_pid, 1, ui, fMgr->ChangeOwn()))
03678 nk++;
03679 }
03680 }
03681 }
03682 }
03683
03684 free(pl);
03685 #else
03686
03687
03688
03689 XrdOucString cmd = "ps ";
03690 bool busr = 0;
03691 const char *cusr = (usr && strlen(usr) && fSuperUser) ? usr : fPClient->ID();
03692 if (all) {
03693 cmd += "ax";
03694 } else {
03695 cmd += "-U ";
03696 cmd += cusr;
03697 cmd += " -u ";
03698 cmd += cusr;
03699 cmd += " -f";
03700 busr = 1;
03701 }
03702 cmd += " | grep proofserv 2>/dev/null";
03703
03704
03705 char cpid[10];
03706 sprintf(cpid, "%d", getpid());
03707
03708
03709 XrdOucString pids = ":";
03710 FILE *fp = popen(cmd.c_str(), "r");
03711 if (fp != 0) {
03712 char line[2048] = { 0 };
03713 while (fgets(line, sizeof(line), fp)) {
03714
03715 char *px = strstr(line, "xpd");
03716 if (!px)
03717
03718 continue;
03719 char *pi = strstr(px+3, cpid);
03720 if (!pi) {
03721
03722 pi = px + 3;
03723 int ppid = (int) XrdProofdAux::GetLong(pi);
03724 TRACE(HDBG, "found alternative parent ID: "<< ppid);
03725
03726 if (XrdProofdAux::VerifyProcessByID(ppid, fParentExecs.c_str()))
03727 continue;
03728 }
03729
03730 int from = 0;
03731 if (busr)
03732 from += strlen(cusr);
03733 int pid = (int) XrdProofdAux::GetLong(&line[from]);
03734 bool muok = 1;
03735 if (fMgr->MultiUser() && !all) {
03736
03737
03738 muok = 0;
03739 XrdProofdProofServ *srv = GetActiveSession(pid);
03740 if (!srv || (srv && !strcmp(usr, srv->Client())))
03741 muok = 1;
03742 }
03743 if (muok)
03744
03745 if (XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn()) == 0)
03746 nk++;
03747 }
03748 pclose(fp);
03749 } else {
03750
03751 return -1;
03752 }
03753 #endif
03754
03755
03756 return nk;
03757 }
03758
03759
03760 int XrdProofdProofServMgr::SetUserOwnerships(XrdProofdProtocol *p,
03761 const char *ord, const char *stag)
03762 {
03763
03764
03765 XPDLOC(SMGR, "ProofServMgr::SetUserOwnerships")
03766
03767 TRACE(REQ, "enter");
03768
03769
03770
03771 if (fMgr->DataSetSrcs()->size() > 0) {
03772 XrdProofUI ui;
03773 XrdProofdAux::GetUserInfo(XrdProofdProtocol::EUidAtStartup(), ui);
03774 std::list<XrdProofdDSInfo *>::iterator ii;
03775 for (ii = fMgr->DataSetSrcs()->begin(); ii != fMgr->DataSetSrcs()->end(); ii++) {
03776 TRACE(ALL, "Checking dataset source: url:"<<(*ii)->fUrl<<", local:"
03777 <<(*ii)->fLocal<<", rw:"<<(*ii)->fRW);
03778 if ((*ii)->fLocal && (*ii)->fRW) {
03779 XrdOucString d;
03780 XPDFORM(d, "%s/%s", ((*ii)->fUrl).c_str(), p->Client()->UI().fGroup.c_str());
03781 if (XrdProofdAux::AssertDir(d.c_str(), ui, fMgr->ChangeOwn()) == 0) {
03782 if (XrdProofdAux::ChangeMod(d.c_str(), 0777) == 0) {
03783 XPDFORM(d, "%s/%s/%s", ((*ii)->fUrl).c_str(), p->Client()->UI().fGroup.c_str(),
03784 p->Client()->UI().fUser.c_str());
03785 if (XrdProofdAux::AssertDir(d.c_str(), p->Client()->UI(), fMgr->ChangeOwn()) == 0) {
03786 if (XrdProofdAux::ChangeMod(d.c_str(), 0755) != 0) {
03787 TRACE(XERR, "problems setting permissions 0755 on: "<<d);
03788 }
03789 } else {
03790 TRACE(XERR, "problems asserting: "<<d);
03791 }
03792 } else {
03793 TRACE(XERR, "problems setting permissions 0777 on: "<<d);
03794 }
03795 } else {
03796 TRACE(XERR, "problems asserting: "<<d);
03797 }
03798 }
03799 }
03800 }
03801
03802
03803
03804 if (fMgr->DataDir() && strlen(fMgr->DataDir()) > 0 && ord && stag) {
03805 XrdProofUI ui;
03806 XrdProofdAux::GetUserInfo(XrdProofdProtocol::EUidAtStartup(), ui);
03807 XrdOucString dgr, dus[3];
03808 XPDFORM(dgr, "%s/%s", fMgr->DataDir(), p->Client()->UI().fGroup.c_str());
03809 if (XrdProofdAux::AssertDir(dgr.c_str(), ui, fMgr->ChangeOwn()) == 0) {
03810 if (XrdProofdAux::ChangeMod(dgr.c_str(), 0777) == 0) {
03811 unsigned int mode = 0755;
03812 if (strchr(fMgr->DataDirOpts(), 'g')) mode = 0775;
03813 if (strchr(fMgr->DataDirOpts(), 'a') || strchr(fMgr->DataDirOpts(), 'o')) mode = 0777;
03814 XPDFORM(dus[0], "%s/%s", dgr.c_str(), p->Client()->UI().fUser.c_str());
03815 XPDFORM(dus[1], "%s/%s", dus[0].c_str(), ord);
03816 XPDFORM(dus[2], "%s/%s", dus[1].c_str(), stag);
03817 for (int i = 0; i < 3; i++) {
03818 if (XrdProofdAux::AssertDir(dus[i].c_str(), p->Client()->UI(), fMgr->ChangeOwn()) == 0) {
03819 if (XrdProofdAux::ChangeMod(dus[i].c_str(), mode) != 0) {
03820 TRACE(XERR, "problems setting permissions "<< oct << mode<<" on: "<<dus[i]);
03821 }
03822 } else {
03823 TRACE(XERR, "problems asserting: "<<dus[i]);
03824 break;
03825 }
03826 }
03827 } else {
03828 TRACE(XERR, "problems setting permissions 0777 on: "<<dgr);
03829 }
03830 } else {
03831 TRACE(XERR, "problems asserting: "<<dgr);
03832 }
03833 }
03834
03835 if (fMgr->ChangeOwn()) {
03836
03837 XrdOucString creds(p->Client()->Sandbox()->Dir());
03838 creds += "/.creds";
03839 if (XrdProofdAux::ChangeOwn(creds.c_str(), p->Client()->UI()) != 0) {
03840 TRACE(XERR, "can't change ownership of "<<creds);
03841 return -1;
03842 }
03843 }
03844
03845
03846 TRACE(REQ, "done");
03847 return 0;
03848 }
03849
03850
03851 int XrdProofdProofServMgr::SetUserEnvironment(XrdProofdProtocol *p)
03852 {
03853
03854
03855
03856
03857 XPDLOC(SMGR, "ProofServMgr::SetUserEnvironment")
03858
03859 TRACE(REQ, "enter");
03860
03861 if (XrdProofdAux::ChangeToDir(p->Client()->Sandbox()->Dir(),
03862 p->Client()->UI(), fMgr->ChangeOwn()) != 0) {
03863 TRACE(XERR, "couldn't change directory to "<< p->Client()->Sandbox()->Dir());
03864 return -1;
03865 }
03866
03867
03868 char *h = new char[8 + strlen(p->Client()->Sandbox()->Dir())];
03869 sprintf(h, "HOME=%s", p->Client()->Sandbox()->Dir());
03870 putenv(h);
03871 TRACE(DBG, "set "<<h);
03872
03873
03874 char *u = new char[8 + strlen(p->Client()->User())];
03875 sprintf(u, "USER=%s", p->Client()->User());
03876 putenv(u);
03877 TRACE(DBG, "set "<<u);
03878
03879
03880
03881 TRACE(DBG, "setting ACLs");
03882 if (fMgr->ChangeOwn() && (int) geteuid() != p->Client()->UI().fUid) {
03883
03884 XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
03885 if (XpdBadPGuard(pGuard, p->Client()->UI().fUid)) {
03886 TRACE(XERR, "could not get privileges");
03887 return -1;
03888 }
03889
03890 initgroups(p->Client()->UI().fUser.c_str(), p->Client()->UI().fGid);
03891 }
03892
03893 if (fMgr->ChangeOwn()) {
03894
03895 TRACE(DBG, "acquiring target user identity: "<<(uid_t)p->Client()->UI().fUid<<
03896 ", "<<(gid_t)p->Client()->UI().fGid);
03897 if (XrdSysPriv::ChangePerm((uid_t)p->Client()->UI().fUid,
03898 (gid_t)p->Client()->UI().fGid) != 0) {
03899 TRACE(XERR, "can't acquire "<< p->Client()->UI().fUser <<" identity");
03900 return -1;
03901 }
03902 }
03903
03904
03905 TRACE(REQ, "done");
03906 return 0;
03907 }
03908
03909
03910 int XrdProofdProofServMgr::SaveAFSkey(XrdSecCredentials *c,
03911 const char *dir, XrdProofUI ui)
03912 {
03913
03914
03915 XPDLOC(SMGR, "ProofServMgr::SaveAFSkey")
03916
03917
03918 if (!dir || strlen(dir) <= 0) {
03919 TRACE(XERR, "dir name undefined");
03920 return -1;
03921 }
03922
03923
03924 if (!c) {
03925 TRACE(XERR, "credentials undefined");
03926 return -1;
03927 }
03928 TRACE(REQ, "dir: "<<dir);
03929
03930
03931 int lout = 0;
03932 char *out = new char[c->size];
03933 if (XrdSutFromHex(c->buffer, out, lout) != 0) {
03934 TRACE(XERR, "problems unparsing hex string");
03935 delete [] out;
03936 return -1;
03937 }
03938
03939
03940 char *key = out + 5;
03941 if (strncmp(key, "afs:", 4)) {
03942 TRACE(DBG, "string does not contain an AFS key");
03943 delete [] out;
03944 return 0;
03945 }
03946 key += 4;
03947
03948
03949 XrdOucString fn = dir;
03950 fn += "/.afs";
03951
03952 int rc = 0;
03953 struct stat st;
03954 if (stat(fn.c_str(), &st) != 0 && errno == ENOENT) {
03955
03956
03957 int fd = open(fn.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600);
03958 if (fd <= 0) {
03959 TRACE(XERR, "problems creating file - errno: " << errno);
03960 delete [] out;
03961 return -1;
03962 }
03963
03964 int lkey = lout - 9;
03965 if (XrdProofdAux::Write(fd, key, lkey) != lkey) {
03966 TRACE(XERR, "problems writing to file - errno: " << errno);
03967 rc = -1;
03968 }
03969
03970
03971 delete [] out;
03972 close(fd);
03973 } else {
03974 TRACE(XERR, "cannot stat existing file "<<fn<<" - errno: " << errno);
03975 delete [] out;
03976 return -1;
03977 }
03978
03979
03980 if (XrdProofdAux::ChangeOwn(fn.c_str(), ui) != 0) {
03981 TRACE(XERR, "can't change ownership of "<<fn);
03982 }
03983
03984 return rc;
03985 }
03986
03987
03988 XrdProofdProofServ *XrdProofdProofServMgr::GetActiveSession(int pid)
03989 {
03990
03991
03992 XrdOucString key; key += pid;
03993 return fSessions.Find(key.c_str());
03994 }
03995
03996
03997 static int BroadcastPriority(const char *, XrdProofdProofServ *ps, void *s)
03998 {
03999
04000 XPDLOC(SMGR, "BroadcastPriority")
04001
04002 XpdBroadcastPriority_t *bp = (XpdBroadcastPriority_t *)s;
04003
04004 int nb = *(bp->fNBroadcast);
04005
04006 XrdOucString emsg;
04007 if (ps) {
04008 if (ps->IsValid() && (ps->Status() == kXPD_running) &&
04009 !(ps->SrvType() == kXPD_Master)) {
04010 XrdProofGroup *g = (ps->Group() && bp->fGroupMgr)
04011 ? bp->fGroupMgr->GetGroup(ps->Group()) : 0;
04012 TRACE(DBG, "group: "<< g<<", client: "<<ps->Client());
04013 if (g && g->Active() > 0) {
04014 TRACE(DBG, "priority: "<< g->Priority()<<" active: "<<g->Active());
04015 int prio = (int) (g->Priority() * 100);
04016 ps->BroadcastPriority(prio);
04017 nb++;
04018 }
04019 }
04020
04021 return 0;
04022 } else {
04023 emsg = "input entry undefined";
04024 }
04025
04026
04027 TRACE(XERR,"protocol error: "<<emsg);
04028 return 1;
04029 }
04030
04031
04032 void XrdProofdProofServMgr::BroadcastClusterInfo()
04033 {
04034
04035 XPDLOC(SMGR, "ProofServMgr::BroadcastClusterInfo")
04036
04037 TRACE(REQ, "enter");
04038
04039 int tot = 0, act = 0;
04040 std::list<XrdProofdProofServ *>::iterator si = fActiveSessions.begin();
04041 while (si != fActiveSessions.end()) {
04042 if ((*si)->SrvType() != kXPD_Worker) {
04043 tot++;
04044 if ((*si)->Status() == kXPD_running) act++;
04045 }
04046 si++;
04047 }
04048 XPDPRT("tot: "<<tot<<", act: "<<act);
04049
04050 si = fActiveSessions.begin();
04051 while (si != fActiveSessions.end()) {
04052 if ((*si)->Status() == kXPD_running) (*si)->SendClusterInfo(tot, act);
04053 si++;
04054 }
04055 }
04056
04057
04058 int XrdProofdProofServMgr::BroadcastPriorities()
04059 {
04060
04061
04062 XPDLOC(SMGR, "ProofServMgr::BroadcastPriorities")
04063
04064 TRACE(REQ, "enter");
04065
04066 int nb = 0;
04067 XpdBroadcastPriority_t bp = { (fMgr ? fMgr->GroupsMgr() : 0), &nb };
04068 fSessions.Apply(BroadcastPriority, (void *)&bp);
04069
04070 return nb;
04071 }
04072
04073
04074 bool XrdProofdProofServMgr::IsReconnecting()
04075 {
04076
04077
04078
04079
04080 int rect = -1;
04081 if (fReconnectTime >= 0) {
04082 rect = time(0) - fReconnectTime;
04083 if (rect < fReconnectTimeOut)
04084 return true;
04085 }
04086
04087 return false;
04088 }
04089
04090
04091 void XrdProofdProofServMgr::SetReconnectTime(bool on)
04092 {
04093
04094
04095
04096 XrdSysMutexHelper mhp(fMutex);
04097
04098 if (on) {
04099 fReconnectTime = time(0);
04100 } else {
04101 fReconnectTime = -1;
04102 }
04103 }
04104
04105
04106 static int FreeClientID(const char *, XrdProofdProofServ *ps, void *s)
04107 {
04108
04109 XPDLOC(SMGR, "FreeClientID")
04110
04111 int pid = *((int *)s);
04112
04113 if (ps) {
04114 ps->FreeClientID(pid);
04115
04116 return 0;
04117 }
04118
04119
04120 TRACE(XERR, "protocol error: undefined session!");
04121 return 1;
04122 }
04123
04124
04125 void XrdProofdProofServMgr::DisconnectFromProofServ(int pid)
04126 {
04127
04128
04129
04130 XrdSysMutexHelper mhp(fMutex);
04131
04132 fSessions.Apply(FreeClientID, (void *)&pid);
04133 }
04134
04135
04136 static int CountTopMasters(const char *, XrdProofdProofServ *ps, void *s)
04137 {
04138
04139 XPDLOC(SMGR, "CountTopMasters")
04140
04141 int *ntm = (int *)s;
04142
04143 XrdOucString emsg;
04144 if (ps) {
04145 if (ps->SrvType() == kXPD_TopMaster) (*ntm)++;
04146
04147 return 0;
04148 } else {
04149 emsg = "input entry undefined";
04150 }
04151
04152
04153 TRACE(XERR,"protocol error: "<<emsg);
04154 return 1;
04155 }
04156
04157
04158 int XrdProofdProofServMgr::CurrentSessions(bool recalculate)
04159 {
04160
04161
04162 XPDLOC(SMGR, "ProofServMgr::CurrentSessions")
04163
04164 TRACE(REQ, "enter");
04165
04166 XrdSysMutexHelper mhp(fMutex);
04167 if (recalculate) {
04168 fCurrentSessions = 0;
04169 fSessions.Apply(CountTopMasters, (void *)&fCurrentSessions);
04170 }
04171
04172
04173 return fCurrentSessions;
04174 }
04175
04176
04177 void XrdProofdProofServMgr::ResolveKeywords(XrdOucString &s, ProofServEnv_t *in)
04178 {
04179
04180
04181
04182 if (!in) return;
04183
04184 bool isWorker = 0;
04185 if (in->fPS->SrvType() == kXPD_Worker) isWorker = 1;
04186
04187
04188 if (!isWorker && s.find("<logfilemst>") != STR_NPOS) {
04189 XrdOucString lfr(in->fLogFile);
04190 if (lfr.endswith(".log")) lfr.erase(lfr.rfind(".log"));
04191 s.replace("<logfilemst>", lfr);
04192 } else if (isWorker && s.find("<logfilewrk>") != STR_NPOS) {
04193 XrdOucString lfr(in->fLogFile);
04194 if (lfr.endswith(".log")) lfr.erase(lfr.rfind(".log"));
04195 s.replace("<logfilewrk>", lfr);
04196 }
04197
04198
04199 if (getenv("USER") && s.find("<user>") != STR_NPOS) {
04200 XrdOucString usr(getenv("USER"));
04201 s.replace("<user>", usr);
04202 }
04203
04204
04205 if (getenv("ROOTSYS") && s.find("<rootsys>") != STR_NPOS) {
04206 XrdOucString rootsys(getenv("ROOTSYS"));
04207 s.replace("<rootsys>", rootsys);
04208 }
04209 }
04210
04211
04212
04213
04214
04215
04216 XrdProofSessionInfo::XrdProofSessionInfo(XrdProofdClient *c, XrdProofdProofServ *s)
04217 {
04218
04219
04220 fLastAccess = 0;
04221
04222
04223 fUser = c ? c->User() : "";
04224 fGroup = c ? c->Group() : "";
04225
04226
04227 fPid = s ? s->SrvPID() : -1;
04228 fID = s ? s->ID() : -1;
04229 fSrvType = s ? s->SrvType() : -1;
04230 fStatus = s ? s->Status() : kXPD_unknown;
04231 fOrdinal = s ? s->Ordinal() : "";
04232 fTag = s ? s->Tag() : "";
04233 fAlias = s ? s->Alias() : "";
04234 fLogFile = s ? s->Fileout() : "";
04235 fROOTTag = (s && s->ROOT())? s->ROOT()->Tag() : "";
04236 fSrvProtVers = (s && s->ROOT()) ? s->ROOT()->SrvProtVers() : -1;
04237 fUserEnvs = s ? s->UserEnvs() : "";
04238 fAdminPath = s ? s->AdminPath() : "";
04239 fUnixPath = s ? s->UNIXSockPath() : "";
04240 }
04241
04242
04243 void XrdProofSessionInfo::FillProofServ(XrdProofdProofServ &s, XrdROOTMgr *rmgr)
04244 {
04245
04246 XPDLOC(SMGR, "SessionInfo::FillProofServ")
04247
04248 s.SetClient(fUser.c_str());
04249 s.SetGroup(fGroup.c_str());
04250 if (fPid > 0)
04251 s.SetSrvPID(fPid);
04252 if (fID >= 0)
04253 s.SetID(fID);
04254 s.SetSrvType(fSrvType);
04255 s.SetStatus(fStatus);
04256 s.SetOrdinal(fOrdinal.c_str());
04257 s.SetTag(fTag.c_str());
04258 s.SetAlias(fAlias.c_str());
04259 s.SetFileout(fLogFile.c_str());
04260 if (rmgr) {
04261 if (rmgr->GetVersion(fROOTTag.c_str())) {
04262 s.SetROOT(rmgr->GetVersion(fROOTTag.c_str()));
04263 } else {
04264 TRACE(ALL, "ROOT version '"<< fROOTTag <<
04265 "' not availabe anymore: setting the default");
04266 s.SetROOT(rmgr->DefaultVersion());
04267 }
04268 }
04269 s.SetUserEnvs(fUserEnvs.c_str());
04270 s.SetAdminPath(fAdminPath.c_str(), 0);
04271 s.SetUNIXSockPath(fUnixPath.c_str());
04272 }
04273
04274
04275 int XrdProofSessionInfo::SaveToFile(const char *file)
04276 {
04277
04278 XPDLOC(SMGR, "SessionInfo::SaveToFile")
04279
04280
04281 if (!file || strlen(file) <= 0) {
04282 TRACE(XERR,"invalid input: "<<file);
04283 return -1;
04284 }
04285
04286
04287 FILE *fpid = fopen(file, "w");
04288 if (fpid) {
04289 fprintf(fpid, "%s %s\n", fUser.c_str(), fGroup.c_str());
04290 fprintf(fpid, "%s\n", fUnixPath.c_str());
04291 fprintf(fpid, "%d %d %d\n", fPid, fID, fSrvType);
04292 fprintf(fpid, "%s %s %s\n", fOrdinal.c_str(), fTag.c_str(), fAlias.c_str());
04293 fprintf(fpid, "%s\n", fLogFile.c_str());
04294 fprintf(fpid, "%d %s\n", fSrvProtVers, fROOTTag.c_str());
04295 if (fUserEnvs.length() > 0)
04296 fprintf(fpid, "\n%s", fUserEnvs.c_str());
04297 fclose(fpid);
04298
04299
04300
04301 if (chmod(file, 0666) != 0) {
04302 TRACE(XERR, "could not change mode to 0666 on file "<<
04303 file<<"; error: "<<errno);
04304 }
04305
04306 return 0;
04307 }
04308
04309 TRACE(XERR,"session pid file cannot be (re-)created: "<<
04310 file<<"; error: "<<errno);
04311 return -1;
04312 }
04313
04314
04315 void XrdProofSessionInfo::Reset()
04316 {
04317
04318
04319 fLastAccess = 0;
04320 fUser = "";
04321 fGroup = "";
04322 fAdminPath = "";
04323 fUnixPath = "";
04324 fPid = -1;
04325 fStatus = kXPD_unknown;
04326 fID = -1;
04327 fSrvType = -1;
04328 fOrdinal = "";
04329 fTag = "";
04330 fAlias = "";
04331 fLogFile = "";
04332 fROOTTag = "";
04333 fSrvProtVers = -1;
04334 fUserEnvs = "";
04335 }
04336
04337
04338 int XrdProofSessionInfo::ReadFromFile(const char *file)
04339 {
04340
04341 XPDLOC(SMGR, "SessionInfo::ReadFromFile")
04342
04343
04344 if (!file || strlen(file) <= 0) {
04345 TRACE(XERR,"invalid input: "<<file);
04346 return -1;
04347 }
04348
04349 Reset();
04350
04351
04352 FILE *fpid = fopen(file,"r");
04353 if (fpid) {
04354 char line[4096];
04355 char v1[512], v2[512], v3[512];
04356 if (fgets(line, sizeof(line), fpid)) {
04357 if (sscanf(line, "%s %s", v1, v2) == 2) {
04358 fUser = v1;
04359 fGroup = v2;
04360 } else {
04361 TRACE(XERR,"warning: corrupted line? "<<line);
04362 }
04363 }
04364 if (fgets(line, sizeof(line), fpid)) {
04365 int l = strlen(line);
04366 if (line[l-1] == '\n') line[l-1] = '\0';
04367 fUnixPath = line;
04368 }
04369 if (fgets(line, sizeof(line), fpid)) {
04370 sscanf(line, "%d %d %d", &fPid, &fID, &fSrvType);
04371 }
04372 if (fgets(line, sizeof(line), fpid)) {
04373 int ns = 0;
04374 if ((ns = sscanf(line, "%s %s %s", v1, v2, v3)) >= 2) {
04375 fOrdinal = v1;
04376 fTag = v2;
04377 fAlias = (ns == 3) ? v3 : "";
04378 } else {
04379 TRACE(XERR,"warning: corrupted line? "<<line);
04380 }
04381 }
04382 if (fgets(line, sizeof(line), fpid)) {
04383 fLogFile = line;
04384 }
04385 if (fgets(line, sizeof(line), fpid)) {
04386 if (sscanf(line, "%d %s", &fSrvProtVers, v1) == 2) {
04387 fROOTTag = v1;
04388 } else {
04389 TRACE(XERR,"warning: corrupted line? "<<line);
04390 }
04391 }
04392
04393 fUserEnvs = "";
04394 off_t lnow = lseek(fileno(fpid), (off_t) 0, SEEK_CUR);
04395 off_t ltot = lseek(fileno(fpid), (off_t) 0, SEEK_END);
04396 int left = (int)(ltot - lnow);
04397 int len = -1;
04398 do {
04399 int wanted = (left > 4095) ? 4095 : left;
04400 while ((len = read(fileno(fpid), line, wanted)) < 0 &&
04401 errno == EINTR)
04402 errno = 0;
04403 if (len < wanted) {
04404 break;
04405 } else {
04406 line[len] = '\0';
04407 fUserEnvs += line;
04408 }
04409
04410 left -= len;
04411 } while (len > 0 && left > 0);
04412
04413 fclose(fpid);
04414
04415 fAdminPath = file;
04416
04417 struct stat st;
04418 if (!stat(file, &st))
04419 fLastAccess = st.st_atime;
04420 } else {
04421 TRACE(XERR,"session file cannot be open: "<< file<<"; error: "<<errno);
04422 return -1;
04423 }
04424
04425
04426 XrdOucString fs(file);
04427 fs += ".status";
04428 fpid = fopen(fs.c_str(),"r");
04429 if (fpid) {
04430 char line[64];
04431 if (fgets(line, sizeof(line), fpid)) {
04432 sscanf(line, "%d", &fStatus);
04433 }
04434
04435 fclose(fpid);
04436 } else {
04437 TRACE(DBG,"no session status file for: "<< fs<<"; session was probably terminated");
04438 }
04439
04440
04441 return 0;
04442 }
04443
04444
04445 int XpdEnv::Matches(const char *usr, const char *grp, int svn, int ver)
04446 {
04447
04448
04449
04450
04451 XPDLOC(SMGR, "XpdEnv::Matches")
04452
04453 int nmtc = -1;
04454
04455 if (fUsers.length() > 0) {
04456 XrdOucString u(usr);
04457 if ((nmtc = u.matches(fUsers.c_str())) == 0) return -1;
04458 } else {
04459 nmtc = strlen(usr);
04460 }
04461 nmtc += 1000;
04462
04463 int nmtcg = -1;
04464 if (fGroups.length() > 0) {
04465 XrdOucString g(grp);
04466 if ((nmtcg = g.matches(fGroups.c_str())) == 0) return -1;
04467 } else {
04468 nmtcg = strlen(grp);
04469 }
04470 nmtc += nmtcg;
04471
04472 TRACE(HDBG, fEnv <<", u:"<<usr<<", g:"<<grp<<" --> nmtc: "<<nmtc);
04473
04474
04475 TRACE(HDBG, fEnv <<", svn:"<<svn);
04476 if (fSvnMin > 0 && svn < fSvnMin) return -1;
04477 if (fSvnMax > 0 && svn > fSvnMax) return -1;
04478
04479
04480 TRACE(HDBG, fEnv <<", ver:"<<ver);
04481 if (fVerMin > 0 && ver < fVerMin) return -1;
04482 if (fVerMax > 0 && ver > fVerMax) return -1;
04483
04484
04485 return nmtc;
04486 }
04487
04488
04489 int XpdEnv::ToVersCode(int ver, bool hex)
04490 {
04491
04492
04493
04494 int maj = -1, min = -1, ptc = -1, xv = ver;
04495 if (hex) {
04496 maj = xv / 65536;
04497 xv -= maj * 65536;
04498 min = xv / 256;
04499 ptc = xv - min * 256;
04500 } else {
04501 maj = xv / 10000;
04502 xv -= maj * 10000;
04503 min = xv / 100;
04504 ptc = xv - min * 100;
04505 }
04506
04507 int vc = (maj << 16) + (min << 8) + ptc;
04508 return vc;
04509 }
04510
04511
04512 void XpdEnv::Print(const char *what)
04513 {
04514
04515 XPDLOC(SMGR, what)
04516
04517 XrdOucString vmi("-1"), vmx("-1");
04518 if (fVerMin > 0) {
04519 int maj = (fVerMin >> 16);
04520 int min = ((fVerMin - maj * 65536) >> 8);
04521 int ptc = fVerMin - maj * 65536 - min * 256;
04522 XPDFORM(vmi, "%d%d%d", maj, min, ptc);
04523 }
04524 if (fVerMax > 0) {
04525 int maj = (fVerMax >> 16);
04526 int min = ((fVerMax - maj * 65536) >> 8);
04527 int ptc = fVerMax - maj * 65536 - min * 256;
04528 XPDFORM(vmx, "%d%d%d", maj, min, ptc);
04529 }
04530 XrdOucString u("allusers"), g("allgroups");
04531 if (fUsers.length() > 0) u = fUsers;
04532 if (fGroups.length() > 0) u = fGroups;
04533
04534 TRACE(ALL, "'"<<fEnv<<"' {"<<u<<"|"<<g<<
04535 "} svn:["<<fSvnMin<<","<<fSvnMax<<"] vers:["<<vmi<<","<<vmx<<"]");
04536 }