00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include <sys/stat.h>
00012
00013 #include "XrdNet/XrdNet.hh"
00014
00015 #include "XrdProofdAux.h"
00016 #include "XrdProofdProofServ.h"
00017 #include "XrdProofWorker.h"
00018 #include "XrdProofSched.h"
00019 #include "XrdProofdManager.h"
00020
00021
00022 #include "XrdProofdTrace.h"
00023
00024 #ifndef SafeDelete
00025 #define SafeDelete(x) { if (x) { delete x; x = 0; } }
00026 #endif
00027 #ifndef SafeDelArray
00028 #define SafeDelArray(x) { if (x) { delete[] x; x = 0; } }
00029 #endif
00030
00031
00032 XrdProofdProofServ::XrdProofdProofServ()
00033 {
00034
00035
00036 fMutex = new XrdSysRecMutex;
00037 fResponse = 0;
00038 fProtocol = 0;
00039 fParent = 0;
00040 fPingSem = 0;
00041 fStartMsg = 0;
00042 fStatus = kXPD_idle;
00043 fSrvPID = -1;
00044 fSrvType = kXPD_AnyServer;
00045 fID = -1;
00046 fIsShutdown = false;
00047 fIsValid = true;
00048 fSkipCheck = false;
00049 fProtVer = -1;
00050 fNClients = 0;
00051 fClients.reserve(10);
00052 fDisconnectTime = -1;
00053 fSetIdleTime = time(0);
00054 fROOT = 0;
00055
00056 fAdminPath = "";
00057 fAlias = "";
00058 fClient = "";
00059 fFileout = "";
00060 fGroup = "";
00061 fOrdinal = "";
00062 fTag = "";
00063 fUserEnvs = "";
00064 fUNIXSock = 0;
00065 fUNIXSockPath = "";
00066 fQueries.clear();
00067 }
00068
00069
00070 XrdProofdProofServ::~XrdProofdProofServ()
00071 {
00072
00073
00074 SafeDelete(fStartMsg);
00075 SafeDelete(fPingSem);
00076
00077 std::vector<XrdClientID *>::iterator i;
00078 for (i = fClients.begin(); i != fClients.end(); i++)
00079 if (*i)
00080 delete (*i);
00081 fClients.clear();
00082
00083
00084 ClearWorkers();
00085
00086
00087 fQueries.clear();
00088
00089
00090 unlink(fUNIXSockPath.c_str());
00091
00092 SafeDelete(fMutex);
00093 }
00094
00095
00096 static int DecreaseWorkerCounters(const char *, XrdProofWorker *w, void *x)
00097 {
00098
00099 XPDLOC(PMGR, "DecreaseWorkerCounters")
00100
00101 XrdProofdProofServ *xps = (XrdProofdProofServ *)x;
00102
00103 if (w && xps) {
00104 w->RemoveProofServ(xps);
00105 TRACE(REQ, w->fHost.c_str() <<" done");
00106
00107 return 0;
00108 }
00109
00110
00111 return 1;
00112 }
00113
00114
00115 static int DumpWorkerCounters(const char *k, XrdProofWorker *w, void *)
00116 {
00117
00118 XPDLOC(PMGR, "DumpWorkerCounters")
00119
00120 if (w) {
00121 TRACE(ALL, k <<" : "<<w->fHost.c_str()<<":"<<w->fPort <<" act: "<<w->Active());
00122
00123 return 0;
00124 }
00125
00126
00127 return 1;
00128 }
00129
00130
00131 void XrdProofdProofServ::ClearWorkers()
00132 {
00133
00134
00135 XrdSysMutexHelper mhp(fMutex);
00136
00137
00138 fWorkers.Apply(DecreaseWorkerCounters, this);
00139 fWorkers.Purge();
00140 }
00141
00142
00143 void XrdProofdProofServ::AddWorker(const char *o, XrdProofWorker *w)
00144 {
00145
00146
00147 if (!o || !w) return;
00148
00149 XrdSysMutexHelper mhp(fMutex);
00150
00151 fWorkers.Add(o, w, 0, Hash_keepdata);
00152 }
00153
00154
00155 void XrdProofdProofServ::RemoveWorker(const char *o)
00156 {
00157
00158 XPDLOC(SMGR, "ProofServ::RemoveWorker")
00159
00160 if (!o) return;
00161
00162 TRACE(DBG,"removing: "<<o);
00163
00164 XrdSysMutexHelper mhp(fMutex);
00165
00166 XrdProofWorker *w = fWorkers.Find(o);
00167 if (w) w->RemoveProofServ(this);
00168 fWorkers.Del(o);
00169 if (TRACING(HDBG)) fWorkers.Apply(DumpWorkerCounters, 0);
00170 }
00171
00172
00173 int XrdProofdProofServ::Reset(const char *msg, int type)
00174 {
00175
00176
00177 XPDLOC(SMGR, "ProofServ::Reset")
00178
00179 int rc = 0;
00180
00181 int st = -1;
00182 XrdOucString fn;
00183 XPDFORM(fn, "%s.status", fAdminPath.c_str());
00184 FILE *fpid = fopen(fn.c_str(), "r");
00185 if (fpid) {
00186 if (fscanf(fpid, "%d", &st) <= 0)
00187 TRACE(XERR,"problems reading from file "<<fn);
00188 fclose(fpid);
00189 }
00190 TRACE(DBG,"file: "<<fn<<", st:"<<st);
00191 XrdSysMutexHelper mhp(fMutex);
00192
00193 if (st == 4) {
00194 Broadcast("idle-timeout", type);
00195 } else {
00196 Broadcast(msg, type);
00197 }
00198
00199 if (fSrvType == kXPD_TopMaster) rc = 1;
00200
00201 Reset();
00202
00203 return rc;
00204 }
00205
00206
00207 void XrdProofdProofServ::Reset()
00208 {
00209
00210 XrdSysMutexHelper mhp(fMutex);
00211
00212 fResponse = 0;
00213 fProtocol = 0;
00214 fParent = 0;
00215 SafeDelete(fStartMsg);
00216 SafeDelete(fPingSem);
00217 fSrvPID = -1;
00218 fID = -1;
00219 fIsShutdown = false;
00220 fIsValid = false;
00221 fSkipCheck = false;
00222 fProtVer = -1;
00223 fNClients = 0;
00224 fClients.clear();
00225 fDisconnectTime = -1;
00226 fSetIdleTime = -1;
00227 fROOT = 0;
00228
00229 ClearWorkers();
00230
00231 fSrvType = kXPD_AnyServer;
00232 fStatus = kXPD_idle;
00233
00234 fQueries.clear();
00235
00236 fAdminPath = "";
00237 fAlias = "";
00238 fClient = "";
00239 fFileout = "";
00240 fGroup = "";
00241 fOrdinal = "";
00242 fTag = "";
00243 fUserEnvs = "";
00244 DeleteUNIXSock();
00245 }
00246
00247
00248 void XrdProofdProofServ::DeleteUNIXSock()
00249 {
00250
00251
00252 SafeDelete(fUNIXSock);
00253 unlink(fUNIXSockPath.c_str());
00254 fUNIXSockPath = "";
00255 }
00256
00257
00258 bool XrdProofdProofServ::SkipCheck()
00259 {
00260
00261
00262 XrdSysMutexHelper mhp(fMutex);
00263
00264 bool rc = fSkipCheck;
00265 fSkipCheck = false;
00266 return rc;
00267 }
00268
00269
00270 XrdClientID *XrdProofdProofServ::GetClientID(int cid)
00271 {
00272
00273 XPDLOC(SMGR, "ProofServ::GetClientID")
00274
00275 XrdClientID *csid = 0;
00276
00277 if (cid < 0) {
00278 TRACE(XERR, "negative ID: protocol error!");
00279 return csid;
00280 }
00281
00282 XrdOucString msg;
00283 { XrdSysMutexHelper mhp(fMutex);
00284
00285
00286 fNClients++;
00287
00288
00289
00290 if (cid < (int)fClients.size()) {
00291 csid = fClients.at(cid);
00292 csid->Reset();
00293
00294
00295 if (TRACING(DBG)) {
00296 XPDFORM(msg, "cid: %d, size: %d", cid, fClients.size());
00297 }
00298 }
00299
00300 if (!csid) {
00301
00302 if (cid >= (int)fClients.capacity())
00303 fClients.reserve(2*fClients.capacity());
00304
00305
00306 int ic = (int)fClients.size();
00307 for (; ic <= cid; ic++)
00308 fClients.push_back((csid = new XrdClientID()));
00309
00310
00311 if (TRACING(DBG)) {
00312 XPDFORM(msg, "cid: %d, new size: %d", cid, fClients.size());
00313 }
00314 }
00315 }
00316 TRACE(DBG, msg);
00317
00318
00319 return csid;
00320 }
00321
00322
00323 int XrdProofdProofServ::FreeClientID(int pid)
00324 {
00325
00326 XPDLOC(SMGR, "ProofServ::FreeClientID")
00327
00328 TRACE(DBG, "svrPID: "<<fSrvPID<< ", pid: "<<pid<<", session status: "<<
00329 fStatus<<", # clients: "<< fNClients);
00330 int rc = -1;
00331 if (pid <= 0) {
00332 TRACE(XERR, "undefined pid!");
00333 return rc;
00334 }
00335 if (!IsValid()) return rc;
00336
00337 { XrdSysMutexHelper mhp(fMutex);
00338
00339
00340 std::vector<XrdClientID *>::iterator i;
00341 for (i = fClients.begin(); i != fClients.end(); ++i) {
00342 if ((*i) && (*i)->P()) {
00343 if ((*i)->P()->Pid() == pid || (*i)->P()->Pid() == -1) {
00344 (*i)->Reset();
00345 fNClients--;
00346
00347 if (fNClients <= 0)
00348 fDisconnectTime = time(0);
00349 rc = 0;
00350 break;
00351 }
00352 }
00353 }
00354 }
00355 if (TRACING(REQ) && (rc == 0)) {
00356 int spid = SrvPID();
00357 TRACE(REQ, spid<<": slot for client pid: "<<pid<<" has been reset");
00358 }
00359
00360
00361 return rc;
00362 }
00363
00364
00365 int XrdProofdProofServ::GetNClients(bool check)
00366 {
00367
00368
00369
00370 XrdSysMutexHelper mhp(fMutex);
00371
00372 if (check) {
00373 fNClients = 0;
00374
00375 std::vector<XrdClientID *>::iterator i;
00376 for (i = fClients.begin(); i != fClients.end(); ++i) {
00377 if ((*i) && (*i)->P() && (*i)->P()->Link()) fNClients++;
00378 }
00379 }
00380
00381
00382 return fNClients;
00383 }
00384
00385
00386 int XrdProofdProofServ::DisconnectTime()
00387 {
00388
00389
00390
00391 XrdSysMutexHelper mhp(fMutex);
00392
00393 int disct = -1;
00394 if (fDisconnectTime > 0)
00395 disct = time(0) - fDisconnectTime;
00396 return ((disct > 0) ? disct : -1);
00397 }
00398
00399
00400 int XrdProofdProofServ::IdleTime()
00401 {
00402
00403
00404
00405 XrdSysMutexHelper mhp(fMutex);
00406
00407 int idlet = -1;
00408 if (fStatus == kXPD_idle)
00409 idlet = time(0) - fSetIdleTime;
00410 return ((idlet > 0) ? idlet : -1);
00411 }
00412
00413
00414 void XrdProofdProofServ::SetIdle()
00415 {
00416
00417
00418
00419 XrdSysMutexHelper mhp(fMutex);
00420
00421 fStatus = kXPD_idle;
00422 fSetIdleTime = time(0);
00423 }
00424
00425
00426 void XrdProofdProofServ::SetRunning()
00427 {
00428
00429
00430
00431 XrdSysMutexHelper mhp(fMutex);
00432
00433 fStatus = kXPD_running;
00434 fSetIdleTime = -1;
00435 }
00436
00437
00438 void XrdProofdProofServ::Broadcast(const char *msg, int type)
00439 {
00440
00441 XPDLOC(SMGR, "ProofServ::Broadcast")
00442
00443
00444 int clproto = (type >= kXPD_wrkmortem) ? 18 : -1;
00445
00446 XrdOucString m;
00447 int len = 0, nc = 0;
00448 if (msg && (len = strlen(msg)) > 0) {
00449 XrdProofdProtocol *p = 0;
00450 int ic = 0, ncz = 0, sid = -1;
00451 { XrdSysMutexHelper mhp(fMutex); ncz = (int) fClients.size(); }
00452 for (ic = 0; ic < ncz; ic++) {
00453 { XrdSysMutexHelper mhp(fMutex);
00454 p = fClients.at(ic)->P();
00455 sid = fClients.at(ic)->Sid(); }
00456
00457 if (p && XPD_CLNT_VERSION_OK(p, clproto)) {
00458 XrdProofdResponse *response = p->Response(sid);
00459 if (response) {
00460 response->Send(kXR_attn, (XProofActionCode)type, (void *)msg, len);
00461 nc++;
00462 } else {
00463 XPDFORM(m, "response instance for sid: %d not found", sid);
00464 }
00465 }
00466 if (m.length() > 0)
00467 TRACE(XERR, m);
00468 m = "";
00469 }
00470 }
00471 if (TRACING(DBG)) {
00472 XPDFORM(m, "type: %d, message: '%s' notified to %d clients", type, msg, nc);
00473 XPDPRT(m);
00474 }
00475 }
00476
00477
00478 int XrdProofdProofServ::TerminateProofServ(bool changeown)
00479 {
00480
00481
00482
00483
00484
00485
00486 XPDLOC(SMGR, "ProofServ::TerminateProofServ")
00487
00488 int pid = fSrvPID;
00489 TRACE(DBG, "ord: " << fOrdinal << ", pid: " << pid);
00490
00491
00492 if (pid > -1) {
00493 XrdProofUI ui;
00494 XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
00495 if (XrdProofdAux::KillProcess(pid, 0, ui, changeown) != 0) {
00496 TRACE(XERR, "ord: problems signalling process: "<<fSrvPID);
00497 }
00498 XrdSysMutexHelper mhp(fMutex);
00499 fIsShutdown = true;
00500 }
00501
00502
00503 return -1;
00504 }
00505
00506
00507 int XrdProofdProofServ::VerifyProofServ(bool forward)
00508 {
00509
00510
00511
00512
00513
00514
00515
00516 XPDLOC(SMGR, "ProofServ::VerifyProofServ")
00517
00518 TRACE(DBG, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
00519
00520 int rc = 0;
00521 XrdOucString msg;
00522
00523
00524 int len = sizeof(kXR_int32);
00525 char *buf = new char[len];
00526
00527 kXR_int32 ifw = (forward) ? (kXR_int32)1 : (kXR_int32)0;
00528 ifw = static_cast<kXR_int32>(htonl(ifw));
00529 memcpy(buf, &ifw, sizeof(kXR_int32));
00530
00531 { XrdSysMutexHelper mhp(fMutex);
00532
00533 if (!fResponse || fResponse->Send(kXR_attn, kXPD_ping, buf, len) != 0) {
00534 msg = "could not propagate ping to proofsrv";
00535 rc = -1;
00536 }
00537 }
00538
00539 delete[] buf;
00540
00541
00542 if (rc != 0)
00543 TRACE(XERR, msg);
00544
00545
00546 return rc;
00547 }
00548
00549
00550 int XrdProofdProofServ::BroadcastPriority(int priority)
00551 {
00552
00553
00554 XPDLOC(SMGR, "ProofServ::BroadcastPriority")
00555
00556 XrdSysMutexHelper mhp(fMutex);
00557
00558
00559 int len = sizeof(kXR_int32);
00560 char *buf = new char[len];
00561 kXR_int32 itmp = priority;
00562 itmp = static_cast<kXR_int32>(htonl(itmp));
00563 memcpy(buf, &itmp, sizeof(kXR_int32));
00564
00565 if (!fResponse || fResponse->Send(kXR_attn, kXPD_priority, buf, len) != 0) {
00566
00567 TRACE(XERR,"problems telling proofserv");
00568 return -1;
00569 }
00570 TRACE(DBG, "priority "<<priority<<" sent over");
00571
00572 return 0;
00573 }
00574
00575
00576 int XrdProofdProofServ::SendData(int cid, void *buff, int len)
00577 {
00578
00579 XPDLOC(SMGR, "ProofServ::SendData")
00580
00581 TRACE(HDBG, "length: "<<len<<" bytes (cid: "<<cid<<")");
00582
00583 int rs = 0;
00584 XrdOucString msg;
00585
00586
00587 XrdClientID *csid = 0;
00588 { XrdSysMutexHelper mhp(fMutex);
00589 if (cid < 0 || cid > (int)(fClients.size() - 1) || !(csid = fClients.at(cid))) {
00590 XPDFORM(msg, "client ID not found (cid: %d, size: %d)", cid, fClients.size());
00591 rs = -1;
00592 }
00593 if (!rs && !(csid->R())) {
00594 XPDFORM(msg, "client not connected: csid: %p, cid: %d, fSid: %d",
00595 csid, cid, csid->Sid());
00596 rs = -1;
00597 }
00598 }
00599
00600
00601
00602 if (!rs) {
00603 rs = -1;
00604 XrdProofdResponse *response = csid->R() ? csid->R() : 0;
00605 if (response)
00606 if (!response->Send(kXR_attn, kXPD_msg, buff, len))
00607 rs = 0;
00608 } else {
00609
00610 TRACE(XERR, msg);
00611 }
00612
00613
00614 return rs;
00615 }
00616
00617
00618 int XrdProofdProofServ::SendDataN(void *buff, int len)
00619 {
00620
00621
00622 XPDLOC(SMGR, "ProofServ::SendDataN")
00623
00624 TRACE(HDBG, "length: "<<len<<" bytes");
00625
00626 XrdOucString msg;
00627
00628 XrdSysMutexHelper mhp(fMutex);
00629
00630
00631 XrdClientID *csid = 0;
00632 int ic = 0;
00633 for (ic = 0; ic < (int) fClients.size(); ic++) {
00634 if ((csid = fClients.at(ic)) && csid->P()) {
00635 XrdProofdResponse *resp = csid->R();
00636 if (!resp || resp->Send(kXR_attn, kXPD_msg, buff, len) != 0)
00637 return -1;
00638 }
00639 }
00640
00641
00642 return 0;
00643 }
00644
00645
00646 void XrdProofdProofServ::ExportBuf(XrdOucString &buf)
00647 {
00648
00649 XPDLOC(SMGR, "ProofServ::ExportBuf")
00650
00651 buf = "";
00652 int id, status, nc;
00653 XrdOucString tag, alias;
00654 { XrdSysMutexHelper mhp(fMutex);
00655 id = fID;
00656 status = fStatus;
00657 nc = fNClients;
00658 tag = fTag;
00659 alias = fAlias; }
00660 XPDFORM(buf, " | %d %s %s %d %d", id, tag.c_str(), alias.c_str(), status, nc);
00661 TRACE(HDBG, "buf: "<< buf);
00662
00663
00664 return;
00665 }
00666
00667
00668 int XrdProofdProofServ::CreateUNIXSock(XrdSysError *edest)
00669 {
00670
00671 XPDLOC(SMGR, "ProofServ::CreateUNIXSock")
00672
00673 TRACE(DBG, "enter");
00674
00675
00676 if (fUNIXSock) {
00677 TRACE(DBG,"UNIX socket exists already! ("<<fUNIXSockPath<<")");
00678 return 0;
00679 }
00680
00681
00682 fUNIXSock = new XrdNet(edest);
00683
00684
00685 struct stat st;
00686 if (fAdminPath.length() > 0 &&
00687 stat(fAdminPath.c_str(), &st) != 0 && (errno == ENOENT)) {;
00688 FILE *fadm = fopen(fAdminPath.c_str(), "w");
00689 fclose(fadm);
00690 }
00691
00692
00693 bool rm = 0, ok = 0;
00694 if (stat(fUNIXSockPath.c_str(), &st) == 0 || (errno != ENOENT)) rm = 1;
00695 if (rm && unlink(fUNIXSockPath.c_str()) != 0) {
00696 if (!S_ISSOCK(st.st_mode)) {
00697 TRACE(XERR, "non-socket path exists: unable to delete it: " <<fUNIXSockPath);
00698 return -1;
00699 } else {
00700 XPDPRT("WARNING: socket path exists: unable to delete it:"
00701 " try to use it anyway " <<fUNIXSockPath);
00702 ok = 1;
00703 }
00704 }
00705
00706
00707 int fd = 0;
00708 if (!ok) {
00709 if ((fd = open(fUNIXSockPath.c_str(), O_EXCL | O_RDWR | O_CREAT, 0700)) < 0) {
00710 TRACE(XERR, "unable to create path: " <<fUNIXSockPath);
00711 return -1;
00712 }
00713 close(fd);
00714 }
00715 if (fd > -1) {
00716
00717 if (fUNIXSock->Bind((char *)fUNIXSockPath.c_str())) {
00718 TRACE(XERR, " problems binding to UNIX socket; path: " <<fUNIXSockPath);
00719 return -1;
00720 } else
00721 TRACE(DBG, "path for UNIX for socket is " <<fUNIXSockPath);
00722 } else {
00723 TRACE(XERR, "unable to open / create path for UNIX socket; tried path "<< fUNIXSockPath);
00724 return -1;
00725 }
00726
00727
00728 if (!geteuid()) {
00729 XrdProofUI ui;
00730 XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
00731 if (chown(fUNIXSockPath.c_str(), ui.fUid, ui.fGid) != 0) {
00732 TRACE(XERR, "unable to change ownership of the UNIX socket"<<fUNIXSockPath);
00733 return -1;
00734 }
00735 }
00736
00737
00738 return 0;
00739 }
00740
00741
00742 int XrdProofdProofServ::SetAdminPath(const char *a, bool assert)
00743 {
00744
00745
00746 XPDLOC(SMGR, "ProofServ::SetAdminPath")
00747
00748 XrdSysMutexHelper mhp(fMutex);
00749
00750 fAdminPath = a;
00751
00752
00753 if (!assert) return 0;
00754
00755
00756 struct stat st;
00757 if (stat(a, &st) != 0 && errno == ENOENT) {
00758
00759 FILE *fpid = fopen(a, "w");
00760 if (fpid) {
00761 fclose(fpid);
00762 } else {
00763 TRACE(XERR, "unable to open / create admin path "<< fAdminPath << "; errno = "<<errno);
00764 return -1;
00765 }
00766 }
00767
00768
00769 XrdOucString fn;
00770 XPDFORM(fn, "%s.status", a);
00771 if (stat(fn.c_str(), &st) != 0 && errno == ENOENT) {
00772
00773 FILE *fpid = fopen(fn.c_str(), "w");
00774 if (fpid) {
00775 fprintf(fpid, "%d", fStatus);
00776 fclose(fpid);
00777 } else {
00778 TRACE(XERR, "unable to open / create status path "<< fn << "; errno = "<<errno);
00779 return -1;
00780 }
00781 }
00782
00783 XrdProofUI ui;
00784 if (XrdProofdAux::GetUserInfo(fClient.c_str(), ui) != 0) {
00785 TRACE(XERR, "unable to get info for user "<<fClient<<"; errno = "<<errno);
00786 return -1;
00787 }
00788 if (XrdProofdAux::ChangeOwn(fn.c_str(), ui) != 0) {
00789 TRACE(XERR, "unable to give ownership of the status file "<< fn << " to user; errno = "<<errno);
00790 return -1;
00791 }
00792
00793 if (stat(fn.c_str(), &st) != 0) {
00794 TRACE(XERR, "creation/assertion of the status path "<< fn << " failed; errno = "<<errno);
00795 return -1;
00796 } else {
00797 TRACE(ALL, "creation/assertion of the status path "<< fn << " was successful!");
00798 }
00799
00800
00801 return 0;
00802 }
00803
00804
00805 int XrdProofdProofServ::Resume()
00806 {
00807
00808
00809
00810 XPDLOC(SMGR, "ProofServ::Resume")
00811
00812 TRACE(REQ, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
00813
00814 int rc = 0;
00815 XrdOucString msg;
00816
00817 { XrdSysMutexHelper mhp(fMutex);
00818
00819 if (!fResponse || fResponse->Send(kXR_attn, kXPD_resume, 0, 0) != 0) {
00820 msg = "could not propagate resume to proofsrv";
00821 rc = -1;
00822 }
00823 }
00824
00825
00826 if (rc != 0)
00827 TRACE(XERR, msg);
00828
00829
00830 return rc;
00831 }
00832
00833
00834 static int ExportWorkerDescription(const char *k, XrdProofWorker *w, void *s)
00835 {
00836
00837 XPDLOC(PMGR, "ExportWorkerDescription")
00838
00839 XrdOucString *wrks = (XrdOucString *)s;
00840 if (w && wrks) {
00841
00842 if (w->fType == 'M') {
00843 if (wrks->length() > 0) wrks->insert('&',0);
00844 wrks->insert(w->Export(), 0);
00845 } else {
00846
00847 if (wrks->length() > 0)
00848 (*wrks) += '&';
00849
00850 (*wrks) += w->Export(k);
00851 }
00852 TRACE(HDBG, k <<" : "<<w->fHost.c_str()<<":"<<w->fPort <<" act: "<<w->Active());
00853
00854 return 0;
00855 }
00856
00857
00858 return 1;
00859 }
00860
00861
00862 void XrdProofdProofServ::ExportWorkers(XrdOucString &wrks)
00863 {
00864
00865
00866 XrdSysMutexHelper mhp(fMutex);
00867 wrks = "";
00868 fWorkers.Apply(ExportWorkerDescription, (void *)&wrks);
00869 }
00870
00871
00872 void XrdProofdProofServ::DumpQueries()
00873 {
00874
00875 XPDLOC(PMGR, "DumpQueries")
00876
00877 XrdSysMutexHelper mhp(fMutex);
00878
00879 TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
00880 TRACE(ALL," +++ client: "<<fClient<<", session: "<< fSrvPID <<
00881 ", # of queries: "<< fQueries.size());
00882 std::list<XrdProofQuery *>::iterator ii;
00883 int i = 0;
00884 for (ii = fQueries.begin(); ii != fQueries.end(); ii++) {
00885 i++;
00886 TRACE(ALL," +++ #"<<i<<" tag:"<< (*ii)->GetTag()<<" dset: "<<
00887 (*ii)->GetDSName()<<" size:"<<(*ii)->GetDSSize());
00888 }
00889 TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
00890 }
00891
00892
00893 XrdProofQuery *XrdProofdProofServ::GetQuery(const char *tag)
00894 {
00895
00896 XrdProofQuery *q = 0;
00897 if (!tag || strlen(tag) <= 0) return q;
00898
00899 XrdSysMutexHelper mhp(fMutex);
00900
00901 if (fQueries.size() <= 0) return q;
00902
00903 std::list<XrdProofQuery *>::iterator ii;
00904 for (ii = fQueries.begin(); ii != fQueries.end(); ii++) {
00905 q = *ii;
00906 if (!strcmp(tag, q->GetTag())) break;
00907 q = 0;
00908 }
00909
00910 return q;
00911 }
00912
00913
00914 void XrdProofdProofServ::RemoveQuery(const char *tag)
00915 {
00916
00917 XrdProofQuery *q = 0;
00918 if (!tag || strlen(tag) <= 0) return;
00919
00920 XrdSysMutexHelper mhp(fMutex);
00921
00922 if (fQueries.size() <= 0) return;
00923
00924 std::list<XrdProofQuery *>::iterator ii;
00925 for (ii = fQueries.begin(); ii != fQueries.end(); ii++) {
00926 q = *ii;
00927 if (!strcmp(tag, q->GetTag())) break;
00928 q = 0;
00929 }
00930
00931 if (q) {
00932 fQueries.remove(q);
00933 delete q;
00934 }
00935
00936
00937 return;
00938 }
00939
00940
00941 static int CountEffectiveSessions(const char *, XrdProofWorker *w, void *s)
00942 {
00943
00944
00945 int *actw = (int *)s;
00946 if (w && actw) {
00947 *actw += w->GetNActiveSessions();
00948
00949 return 0;
00950 }
00951
00952
00953 return 1;
00954 }
00955
00956
00957 void XrdProofdProofServ::SendClusterInfo(int nsess, int nacti)
00958 {
00959
00960
00961
00962 XPDLOC(PMGR, "SendClusterInfo")
00963
00964
00965 if (fWorkers.Num() <= 0) return;
00966
00967 int actw = 0;
00968 fWorkers.Apply(CountEffectiveSessions, (void *)&actw);
00969
00970 int neffs = (actw*1000)/fWorkers.Num();
00971 TRACE(DBG, "# sessions: "<<nsess<<", # active: "<<nacti<<", # effective: "<<neffs/1000.);
00972
00973 XrdSysMutexHelper mhp(fMutex);
00974
00975
00976 int len = 3*sizeof(kXR_int32);
00977 char *buf = new char[len];
00978 kXR_int32 off = 0;
00979 kXR_int32 itmp = nsess;
00980 itmp = static_cast<kXR_int32>(htonl(itmp));
00981 memcpy(buf + off, &itmp, sizeof(kXR_int32));
00982 off += sizeof(kXR_int32);
00983 itmp = nacti;
00984 itmp = static_cast<kXR_int32>(htonl(itmp));
00985 memcpy(buf + off, &itmp, sizeof(kXR_int32));
00986 off += sizeof(kXR_int32);
00987 itmp = neffs;
00988 itmp = static_cast<kXR_int32>(htonl(itmp));
00989 memcpy(buf + off, &itmp, sizeof(kXR_int32));
00990
00991 if (!fResponse || fResponse->Send(kXR_attn, kXPD_clusterinfo, buf, len) != 0) {
00992
00993 TRACE(XERR,"problems sending proofserv");
00994 }
00995
00996 }
00997
00998
00999 int XrdProofdProofServ::CheckSession(bool oldvers, bool isrec,
01000 int shutopt, int shutdel, bool changeown, int &nc)
01001 {
01002
01003
01004
01005 XPDLOC(PMGR, "SendClusterInfo")
01006
01007 XrdOucString emsg;
01008 bool rmsession = 0;
01009 nc = -1;
01010 { XrdSysMutexHelper mhp(fMutex);
01011
01012 bool skipcheck = fSkipCheck;
01013 fSkipCheck = false;
01014
01015 if (!skipcheck || oldvers) {
01016 nc = 0;
01017
01018 std::vector<XrdClientID *>::iterator i;
01019 for (i = fClients.begin(); i != fClients.end(); ++i) {
01020 if ((*i) && (*i)->P() && (*i)->P()->Link()) nc++;
01021 }
01022
01023 if (nc <= 0 && (!isrec || oldvers)) {
01024 int idlet = -1, disct = -1, now = time(0);
01025 if (fStatus == kXPD_idle)
01026 idlet = now - fSetIdleTime;
01027 if (idlet <= 0) idlet = -1;
01028 if (fDisconnectTime > 0)
01029 disct = now - fDisconnectTime;
01030 if (disct <= 0) disct = -1;
01031 if ((fSrvType != kXPD_TopMaster) ||
01032 (shutopt == 1 && (idlet >= shutdel)) ||
01033 (shutopt == 2 && (disct >= shutdel))) {
01034
01035 if (fSrvPID > -1) {
01036 XrdProofUI ui;
01037 XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
01038 if (XrdProofdAux::KillProcess(fSrvPID, 0, ui, changeown) != 0) {
01039 XPDFORM(emsg, "ord: problems signalling process: %d", fSrvPID);
01040 }
01041 fIsShutdown = true;
01042 }
01043 rmsession = 1;
01044 }
01045 }
01046 }
01047 }
01048
01049 if (emsg.length() > 0) {
01050 TRACE(XERR,emsg.c_str());
01051 }
01052
01053 return rmsession;
01054 }