00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <sys/stat.h>
00023
00024 #include "XrdNet/XrdNet.hh"
00025 #include "XrdSys/XrdSysPriv.hh"
00026
00027 #include "XrdProofdClient.h"
00028 #include "XrdProofdProtocol.h"
00029 #include "XrdProofdProofServ.h"
00030 #include "XrdProofdProofServMgr.h"
00031
00032 #include "XrdProofdTrace.h"
00033
00034
00035 XrdProofdClient::XrdProofdClient(XrdProofUI ui, bool master, bool changeown,
00036 XrdSysError *, const char *adminpath)
00037 : fSandbox(ui, master, changeown)
00038 {
00039
00040 XPDLOC(CMGR, "Client::Client")
00041
00042 fProofServs.clear();
00043 fClients.clear();
00044 fUI = ui;
00045 fROOT = 0;
00046 fIsValid = 0;
00047 fAskedToTouch = 0;
00048 fChangeOwn = changeown;
00049
00050
00051 XPDFORM(fAdminPath, "%s/%s.%s", adminpath, ui.fUser.c_str(), ui.fGroup.c_str());
00052 struct stat st;
00053 if (stat(adminpath, &st) != 0) {
00054 TRACE(XERR, "problems stating admin path "<<adminpath<<"; errno = "<<errno);
00055 return;
00056 }
00057 XrdProofUI effui;
00058 XrdProofdAux::GetUserInfo(st.st_uid, effui);
00059 if (XrdProofdAux::AssertDir(fAdminPath.c_str(), effui, 1) != 0)
00060 return;
00061
00062
00063 if (fSandbox.IsValid()) fIsValid = 1;
00064 }
00065
00066
00067 XrdProofdClient::~XrdProofdClient()
00068 {
00069
00070
00071 }
00072
00073
00074 bool XrdProofdClient::Match(const char *usr, const char *grp)
00075 {
00076
00077
00078 if (!fIsValid) return 0;
00079
00080 bool rc = (usr && !strcmp(usr, User())) ? 1 : 0;
00081 if (rc && grp && strlen(grp) > 0)
00082 rc = (grp && Group() && !strcmp(grp, Group())) ? 1 : 0;
00083
00084 return rc;
00085 }
00086
00087
00088 int XrdProofdClient::GetClientID(XrdProofdProtocol *p)
00089 {
00090
00091
00092 XPDLOC(CMGR, "Client::GetClientID")
00093
00094 XrdClientID *cid = 0;
00095 int ic = 0, sz = 0;
00096 { XrdSysMutexHelper mh(fMutex);
00097 if (!fIsValid) return -1;
00098
00099 for (ic = 0; ic < (int)fClients.size() ; ic++) {
00100 if (fClients[ic] && !fClients[ic]->IsValid()) {
00101 cid = fClients[ic];
00102 cid->Reset();
00103 break;
00104 }
00105 }
00106
00107 if (!cid) {
00108
00109 if (ic >= (int)fClients.capacity())
00110 fClients.reserve(2*fClients.capacity());
00111
00112
00113 cid = new XrdClientID();
00114 fClients.push_back(cid);
00115 sz = fClients.size();
00116 }
00117 }
00118
00119 if (cid) {
00120 cid->SetP(p);
00121
00122 unsigned short sid;
00123 memcpy((void *)&sid, (const void *)&(p->Request()->header.streamid[0]), 2);
00124 cid->SetSid(sid);
00125 }
00126
00127 TRACE(DBG, "size = "<<sz<<", ic = "<<ic);
00128
00129
00130 return ic;
00131 }
00132
00133
00134 int XrdProofdClient::ReserveClientID(int cid)
00135 {
00136
00137
00138 XPDLOC(CMGR, "Client::ReserveClientID")
00139
00140 if (cid < 0)
00141 return -1;
00142
00143 int sz = 0, newsz = 0;
00144 { XrdSysMutexHelper mh(fMutex);
00145 if (!fIsValid) return -1;
00146 if (cid >= (int)fClients.size()) {
00147
00148
00149 newsz = fClients.capacity();
00150 if (cid >= (int)fClients.capacity()) {
00151 newsz = 2 * fClients.capacity();
00152 newsz = (cid < newsz) ? newsz : cid + 1;
00153 fClients.reserve(newsz);
00154 }
00155
00156
00157 while (cid >= (int)fClients.size())
00158 fClients.push_back(new XrdClientID());
00159 }
00160 sz = fClients.size();
00161 }
00162
00163 TRACE(DBG, "cid = "<<cid<<", size = "<<sz<<", capacity = "<<newsz);
00164
00165
00166 return 0;
00167 }
00168
00169
00170 XrdProofdProofServ *XrdProofdClient::GetFreeServObj()
00171 {
00172
00173
00174 XPDLOC(CMGR, "Client::GetFreeServObj")
00175
00176 int ic = 0, newsz = 0, sz = 0;
00177 XrdProofdProofServ *xps = 0;
00178 XrdOucString msg;
00179 { XrdSysMutexHelper mh(fMutex);
00180 if (!fIsValid) return xps;
00181
00182
00183 for (ic = 0; ic < (int)fProofServs.size() ; ic++) {
00184 if (fProofServs[ic] && !(fProofServs[ic]->IsValid())) {
00185 fProofServs[ic]->SetValid();
00186 break;
00187 }
00188 }
00189
00190
00191 if (ic >= (int)fProofServs.capacity()) {
00192 newsz = 2 * fProofServs.capacity();
00193 fProofServs.reserve(newsz);
00194 }
00195 if (ic >= (int)fProofServs.size()) {
00196
00197 fProofServs.push_back(new XrdProofdProofServ());
00198 }
00199 sz = fProofServs.size();
00200
00201 xps = fProofServs[ic];
00202 xps->SetValid();
00203 xps->SetID(ic);
00204 }
00205
00206
00207 if (TRACING(DBG)) {
00208 if (newsz > 0) {
00209 XPDFORM(msg, "new capacity = %d, size = %d, ic = %d, xps = %p",
00210 newsz, sz, ic, xps);
00211 } else {
00212 XPDFORM(msg, "size = %d, ic = %d, xps = %p", sz, ic, xps);
00213 }
00214 XPDPRT(msg);
00215 }
00216
00217
00218 return xps;
00219 }
00220
00221
00222 XrdProofdProofServ *XrdProofdClient::GetServObj(int id)
00223 {
00224
00225 XPDLOC(CMGR, "Client::GetServObj")
00226
00227 TRACE(DBG, "id: "<< id);
00228
00229 if (id < 0) {
00230 TRACE(XERR, "invalid input: id: "<< id);
00231 return (XrdProofdProofServ *)0;
00232 }
00233
00234 XrdOucString dmsg, emsg;
00235 XrdProofdProofServ *xps = 0;
00236 int siz = 0, cap = 0;
00237 { XrdSysMutexHelper mh(fMutex);
00238 if (!fIsValid) return xps;
00239 siz = fProofServs.size();
00240 cap = fProofServs.capacity();
00241 }
00242 TRACE(DBG, "size = "<<siz<<"; capacity = "<<cap);
00243
00244 bool newcap = 0;
00245 { XrdSysMutexHelper mh(fMutex);
00246 if (!fIsValid) return xps;
00247 if (id < (int)fProofServs.size()) {
00248 if (!(xps = fProofServs[id])) {
00249 emsg = "instance in use or undefined! protocol error";
00250 }
00251 } else {
00252
00253 if (id >= (int)fProofServs.capacity()) {
00254 int newsz = 2 * fProofServs.capacity();
00255 newsz = (id < newsz) ? newsz : id+1;
00256 fProofServs.reserve(newsz);
00257 cap = fProofServs.capacity();
00258 newcap = 1;
00259 }
00260 int nnew = id - fProofServs.size() + 1;
00261 while (nnew--)
00262 fProofServs.push_back(new XrdProofdProofServ());
00263 xps = fProofServs[id];
00264 }
00265 }
00266 xps->SetID(id);
00267 xps->SetValid();
00268 if (TRACING(DBG)) {
00269 { XrdSysMutexHelper mh(fMutex);
00270 if (fIsValid) {
00271 siz = fProofServs.size();
00272 cap = fProofServs.capacity();
00273 }
00274 }
00275 TRACE(DBG, "size = "<<siz<<" (capacity = "<<cap<<"); id = "<<id);
00276 }
00277
00278
00279 return xps;
00280 }
00281
00282
00283 XrdProofdProofServ *XrdProofdClient::GetServer(XrdProofdProtocol *p)
00284 {
00285
00286 XPDLOC(CMGR, "Client::GetServer")
00287
00288 TRACE(DBG, "enter: p: " << p);
00289
00290
00291 XrdProofdProofServ *xps = 0;
00292 std::vector<XrdProofdProofServ *>::iterator ip;
00293 XrdSysMutexHelper mh(fMutex);
00294 if (!fIsValid) return xps;
00295 for (ip = fProofServs.begin(); ip != fProofServs.end(); ++ip) {
00296 xps = (*ip);
00297 if (xps && xps->SrvPID() == p->Pid())
00298 break;
00299 xps = 0;
00300 }
00301
00302 return xps;
00303 }
00304
00305
00306 XrdProofdProofServ *XrdProofdClient::GetServer(int psid)
00307 {
00308
00309
00310 XrdSysMutexHelper mh(fMutex);
00311 if (fIsValid && psid > -1 && psid < (int) fProofServs.size())
00312 return fProofServs.at(psid);
00313
00314 return (XrdProofdProofServ *)0;
00315 }
00316
00317
00318 void XrdProofdClient::EraseServer(int psid)
00319 {
00320
00321 XPDLOC(CMGR, "Client::EraseServer")
00322
00323 TRACE(DBG, "enter: psid: " << psid);
00324
00325 XrdProofdProofServ *xps = 0;
00326 std::vector<XrdProofdProofServ *>::iterator ip;
00327 XrdSysMutexHelper mh(fMutex);
00328 if (!fIsValid) return;
00329
00330 for (ip = fProofServs.begin(); ip != fProofServs.end(); ++ip) {
00331 xps = *ip;
00332 if (xps && xps->Match(psid)) {
00333
00334 xps->Reset();
00335 break;
00336 }
00337 }
00338 }
00339
00340
00341 int XrdProofdClient::GetTopServers()
00342 {
00343
00344 XPDLOC(CMGR, "Client::GetTopServers")
00345
00346 int nv = 0;
00347
00348 XrdProofdProofServ *xps = 0;
00349 std::vector<XrdProofdProofServ *>::iterator ip;
00350 XrdSysMutexHelper mh(fMutex);
00351 if (!fIsValid) return nv;
00352 for (ip = fProofServs.begin(); ip != fProofServs.end(); ++ip) {
00353 if ((xps = *ip) && xps->IsValid() && (xps->SrvType() == kXPD_TopMaster)) {
00354 TRACE(DBG,"found potentially valid topmaster session: pid "<<xps->SrvPID());
00355 nv++;
00356 }
00357 }
00358
00359
00360 return nv;
00361 }
00362
00363
00364 int XrdProofdClient::ResetClientSlot(int ic)
00365 {
00366
00367 XPDLOC(CMGR, "Client::ResetClientSlot")
00368
00369 TRACE(DBG, "enter: ic: " << ic);
00370
00371 XrdSysMutexHelper mh(fMutex);
00372 if (fIsValid) {
00373 if (ic >= 0 && ic < (int) fClients.size()) {
00374 fClients[ic]->Reset();
00375 return 0;
00376 }
00377 }
00378
00379 return -1;
00380 }
00381
00382
00383 XrdProofdProtocol *XrdProofdClient::GetProtocol(int ic)
00384 {
00385
00386 XPDLOC(CMGR, "Client::GetProtocol")
00387
00388 TRACE(DBG, "enter: ic: " << ic);
00389
00390 XrdProofdProtocol *p = 0;
00391
00392 XrdSysMutexHelper mh(fMutex);
00393 if (fIsValid) {
00394 if (ic >= 0 && ic < (int) fClients.size()) {
00395 p = fClients[ic]->P();
00396 }
00397 }
00398
00399 return p;
00400 }
00401
00402
00403 int XrdProofdClient::SetClientID(int cid, XrdProofdProtocol *p)
00404 {
00405
00406 XPDLOC(CMGR, "Client::SetClientID")
00407
00408 TRACE(DBG, "cid: "<< cid <<", p: " << p);
00409
00410 XrdSysMutexHelper mh(fMutex);
00411 if (!fIsValid) return -1;
00412
00413 if (cid >= 0 && cid < (int) fClients.size()) {
00414 if (fClients[cid] && (fClients[cid]->P() != p))
00415 fClients[cid]->Reset();
00416 fClients[cid]->SetP(p);
00417
00418 unsigned short sid;
00419 memcpy((void *)&sid, (const void *)&(p->Request()->header.streamid[0]), 2);
00420 fClients[cid]->SetSid(sid);
00421 return 0;
00422 }
00423
00424
00425 return -1;
00426 }
00427
00428
00429 void XrdProofdClient::Broadcast(const char *msg)
00430 {
00431
00432 XPDLOC(CMGR, "Client::Broadcast")
00433
00434 int len = 0;
00435 if (msg && (len = strlen(msg)) > 0) {
00436
00437
00438 int ic = 0;
00439 XrdClientID *cid = 0;
00440 XrdSysMutexHelper mh(fMutex);
00441 for (ic = 0; ic < (int) fClients.size(); ic++) {
00442 if ((cid = fClients.at(ic)) && cid->P() && cid->P()->ConnType() == kXPD_ClientMaster) {
00443
00444 if (cid->P()->Link()) {
00445 TRACE(ALL," sending to: "<<cid->P()->Link()->ID);
00446 XrdProofdResponse *response = cid->R();
00447 if (response)
00448 response->Send(kXR_attn, kXPD_srvmsg, (char *) msg, len);
00449 }
00450 }
00451 }
00452 }
00453 }
00454
00455
00456 int XrdProofdClient::Touch(bool reset)
00457 {
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467 if (reset) {
00468 fAskedToTouch = 0;
00469 return 0;
00470 }
00471
00472
00473 if (fAskedToTouch) return 1;
00474
00475
00476 int ic = 0;
00477 XrdClientID *cid = 0;
00478 XrdSysMutexHelper mh(fMutex);
00479 for (ic = 0; ic < (int) fClients.size(); ic++) {
00480
00481 if ((cid = fClients.at(ic)) && cid->P() && cid->P()->ProofProtocol() > 17) {
00482 if (cid->P()->ConnType() != kXPD_Internal) {
00483 XrdProofdResponse *response = cid->R();
00484 if (response) response->Send(kXR_attn, kXPD_touch, (char *)0, 0);
00485 }
00486 }
00487 }
00488
00489 fAskedToTouch = 1;
00490
00491 return 0;
00492 }
00493
00494
00495 bool XrdProofdClient::VerifySession(XrdProofdProofServ *xps, XrdProofdResponse *r)
00496 {
00497
00498
00499
00500 XPDLOC(CMGR, "Client::VerifySession")
00501
00502 if (!xps || !(xps->IsValid())) {
00503 TRACE(XERR, " session undefined or invalid");
00504 return 0;
00505 }
00506
00507
00508 XrdOucString path(xps->AdminPath());
00509 if (path.length() <= 0) {
00510 TRACE(XERR, "admin path is empty! - protocol error");
00511 return 0;
00512 }
00513 path += ".status";
00514
00515
00516 struct stat st0;
00517 if (stat(path.c_str(), &st0) != 0) {
00518 TRACE(XERR, "cannot stat admin path: "<<path);
00519 return 0;
00520 }
00521 int now = time(0);
00522 if (now >= st0.st_mtime && (now - st0.st_mtime) <= 1) return 1;
00523 TRACE(ALL, "admin path: "<<path<<", mtime: "<< st0.st_mtime << ", now: "<< now);
00524
00525
00526 int pid = xps->SrvPID();
00527
00528 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
00529
00530 if (xps->VerifyProofServ(0) != 0) {
00531 TRACE(XERR, "could not send verify request to proofsrv");
00532 return 0;
00533 }
00534
00535
00536 XrdOucString notmsg;
00537 struct stat st1;
00538 int ns = 10;
00539 while (ns--) {
00540 if (stat(path.c_str(), &st1) == 0) {
00541 if (st1.st_mtime > st0.st_mtime) {
00542 return 1;
00543 }
00544 }
00545
00546 TRACE(HDBG, "waiting "<<ns<<" secs for session "<<pid<<
00547 " to touch the admin path");
00548 if (r && ns == 5) {
00549 XPDFORM(notmsg, "verifying existing sessions, %d seconds ...", ns);
00550 r->Send(kXR_attn, kXPD_srvmsg, 0, (char *) notmsg.c_str(), notmsg.length());
00551 }
00552 sleep(1);
00553 }
00554 }
00555
00556
00557 return 0;
00558 }
00559
00560
00561 void XrdProofdClient::SkipSessionsCheck(std::list<XrdProofdProofServ *> *active,
00562 XrdOucString &emsg, XrdProofdResponse *r)
00563 {
00564
00565
00566
00567
00568 XPDLOC(CMGR, "Client::SkipSessionsCheck")
00569
00570 XrdProofdProofServ *xps = 0;
00571 std::vector<XrdProofdProofServ *>::iterator ip;
00572 for (ip = fProofServs.begin(); ip != fProofServs.end(); ++ip) {
00573 if ((xps = *ip) && xps->IsValid() && (xps->SrvType() == kXPD_TopMaster)) {
00574 if (VerifySession(xps, r)) {
00575 xps->SetSkipCheck();
00576 if (active) active->push_back(xps);
00577 } else {
00578 if (xps->SrvPID() > 0) {
00579 if (emsg.length() <= 0)
00580 emsg = "ignoring (apparently) non-responding session(s): ";
00581 else
00582 emsg += " ";
00583 emsg += xps->SrvPID();
00584 }
00585 TRACE(ALL,"session "<<xps->SrvPID()<<" does not react: dead?");
00586 }
00587 }
00588 }
00589 if (active)
00590 TRACE(HDBG, "found: " << active->size() << " sessions");
00591
00592
00593 return;
00594 }
00595
00596
00597 XrdOucString XrdProofdClient::ExportSessions(XrdOucString &emsg,
00598 XrdProofdResponse *r)
00599 {
00600
00601
00602 XrdOucString out, buf;
00603
00604
00605 std::list<XrdProofdProofServ *> active;
00606 SkipSessionsCheck(&active, emsg, r);
00607
00608
00609 XrdProofdProofServ *xps = 0;
00610 out += (int) active.size();
00611 std::list<XrdProofdProofServ *>::iterator ia;
00612 for (ia = active.begin(); ia != active.end(); ++ia) {
00613 if ((xps = *ia) && xps->IsValid()) {
00614 xps->ExportBuf(buf);
00615 out += buf;
00616 }
00617 }
00618
00619
00620 return out;
00621 }
00622
00623
00624 void XrdProofdClient::TerminateSessions(int srvtype, XrdProofdProofServ *ref,
00625 const char *msg, XrdProofdPipe *pipe,
00626 bool changeown)
00627 {
00628
00629 XPDLOC(CMGR, "Client::TerminateSessions")
00630
00631
00632 int is = 0;
00633 XrdProofdProofServ *s = 0;
00634 for (is = 0; is < (int) fProofServs.size(); is++) {
00635 if ((s = fProofServs.at(is)) && s->IsValid() && (!ref || ref == s) &&
00636 (s->SrvType() == srvtype || (srvtype == kXPD_AnyServer))) {
00637 TRACE(DBG, "terminating " << s->SrvPID());
00638
00639 if (srvtype == kXPD_TopMaster && msg && strlen(msg) > 0)
00640
00641 Broadcast(msg);
00642
00643
00644 s->TerminateProofServ(changeown);
00645
00646
00647 XrdOucString tag = "-";
00648 tag += s->SrvPID();
00649 if (fSandbox.GuessTag(tag, 1) == 0)
00650 fSandbox.RemoveSession(tag.c_str());
00651
00652
00653 if (pipe) {
00654 int rc = 0;
00655 XrdOucString buf(s->AdminPath());
00656 buf.erase(0, buf.rfind('/') + 1);
00657 TRACE(DBG,"posting kSessionRemoval with: '"<<buf<<"'");
00658 if ((rc = pipe->Post(XrdProofdProofServMgr::kSessionRemoval, buf.c_str())) != 0) {
00659 TRACE(XERR, "problem posting the pipe; errno: "<<-rc);
00660 }
00661 }
00662
00663
00664 s->Reset();
00665 }
00666 }
00667 }
00668
00669
00670 void XrdProofdClient::ResetSessions()
00671 {
00672
00673
00674 fAskedToTouch = 0;
00675
00676 XrdSysMutexHelper mh(fMutex);
00677 std::vector<XrdProofdProofServ *>::iterator ip;
00678 for (ip = fProofServs.begin(); ip != fProofServs.end(); ip++) {
00679
00680 if (*ip) (*ip)->Reset();
00681 }
00682 }
00683