XrdProofdProofServ.cxx

Go to the documentation of this file.
00001 // @(#)root/proofd:$Id: XrdProofdProofServ.cxx 34101 2010-06-24 12:12:59Z ganis $
00002 // Author: Gerardo Ganis  12/12/2005
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 #include <sys/stat.h>
00012 
00013 #include "XrdNet/XrdNet.hh"
00014 
00015 #include "XrdProofdAux.h"
00016 #include "XrdProofdProofServ.h"
00017 #include "XrdProofWorker.h"
00018 #include "XrdProofSched.h"
00019 #include "XrdProofdManager.h"
00020 
00021 // Tracing utils
00022 #include "XrdProofdTrace.h"
00023 
00024 #ifndef SafeDelete
00025 #define SafeDelete(x) { if (x) { delete x; x = 0; } }
00026 #endif
00027 #ifndef SafeDelArray
00028 #define SafeDelArray(x) { if (x) { delete[] x; x = 0; } }
00029 #endif
00030 
00031 //__________________________________________________________________________
00032 XrdProofdProofServ::XrdProofdProofServ()
00033 {
00034    // Constructor
00035 
00036    fMutex = new XrdSysRecMutex;
00037    fResponse = 0;
00038    fProtocol = 0;
00039    fParent = 0;
00040    fPingSem = 0;
00041    fStartMsg = 0;
00042    fStatus = kXPD_idle;
00043    fSrvPID = -1;
00044    fSrvType = kXPD_AnyServer;
00045    fID = -1;
00046    fIsShutdown = false;
00047    fIsValid = true;  // It is created for a valid server ...
00048    fSkipCheck = false;
00049    fProtVer = -1;
00050    fNClients = 0;
00051    fClients.reserve(10);
00052    fDisconnectTime = -1;
00053    fSetIdleTime = time(0);
00054    fROOT = 0;
00055    // Strings
00056    fAdminPath = "";
00057    fAlias = "";
00058    fClient = "";
00059    fFileout = "";
00060    fGroup = "";
00061    fOrdinal = "";
00062    fTag = "";
00063    fUserEnvs = "";
00064    fUNIXSock = 0;
00065    fUNIXSockPath = "";
00066    fQueries.clear();
00067 }
00068 
00069 //__________________________________________________________________________
00070 XrdProofdProofServ::~XrdProofdProofServ()
00071 {
00072    // Destructor
00073 
00074    SafeDelete(fStartMsg);
00075    SafeDelete(fPingSem);
00076 
00077    std::vector<XrdClientID *>::iterator i;
00078    for (i = fClients.begin(); i != fClients.end(); i++)
00079        if (*i)
00080           delete (*i);
00081    fClients.clear();
00082 
00083    // Cleanup worker info
00084    ClearWorkers();
00085 
00086    // Cleanup queries info
00087    fQueries.clear();
00088 
00089    // Remove the associated UNIX socket path
00090    unlink(fUNIXSockPath.c_str());
00091 
00092    SafeDelete(fMutex);
00093 }
00094 
00095 //__________________________________________________________________________
00096 static int DecreaseWorkerCounters(const char *, XrdProofWorker *w, void *x)
00097 {
00098    // Decrease active session counters on worker w
00099    XPDLOC(PMGR, "DecreaseWorkerCounters")
00100 
00101    XrdProofdProofServ *xps = (XrdProofdProofServ *)x;
00102 
00103    if (w && xps) {
00104       w->RemoveProofServ(xps);
00105       TRACE(REQ, w->fHost.c_str() <<" done");
00106       // Check next
00107       return 0;
00108    }
00109 
00110    // Not enough info: stop
00111    return 1;
00112 }
00113 
00114 //__________________________________________________________________________
00115 static int DumpWorkerCounters(const char *k, XrdProofWorker *w, void *)
00116 {
00117    // Decrease active session counters on worker w
00118    XPDLOC(PMGR, "DumpWorkerCounters")
00119 
00120    if (w) {
00121       TRACE(ALL, k <<" : "<<w->fHost.c_str()<<":"<<w->fPort <<" act: "<<w->Active());
00122       // Check next
00123       return 0;
00124    }
00125 
00126    // Not enough info: stop
00127    return 1;
00128 }
00129 
00130 //__________________________________________________________________________
00131 void XrdProofdProofServ::ClearWorkers()
00132 {
00133    // Decrease worker counters and clean-up the list
00134 
00135    XrdSysMutexHelper mhp(fMutex);
00136 
00137    // Decrease workers' counters and remove this from workers
00138    fWorkers.Apply(DecreaseWorkerCounters, this);
00139    fWorkers.Purge();
00140 }
00141 
00142 //__________________________________________________________________________
00143 void XrdProofdProofServ::AddWorker(const char *o, XrdProofWorker *w)
00144 {
00145    // Add a worker assigned to this session with label 'o'
00146 
00147    if (!o || !w) return;
00148 
00149    XrdSysMutexHelper mhp(fMutex);
00150 
00151    fWorkers.Add(o, w, 0, Hash_keepdata);
00152 }
00153 
00154 //__________________________________________________________________________
00155 void XrdProofdProofServ::RemoveWorker(const char *o)
00156 {
00157    // Release worker assigned to this session with label 'o'
00158    XPDLOC(SMGR, "ProofServ::RemoveWorker")
00159 
00160    if (!o) return;
00161 
00162    TRACE(DBG,"removing: "<<o);
00163 
00164    XrdSysMutexHelper mhp(fMutex);
00165 
00166    XrdProofWorker *w = fWorkers.Find(o);
00167    if (w) w->RemoveProofServ(this);
00168    fWorkers.Del(o);
00169    if (TRACING(HDBG)) fWorkers.Apply(DumpWorkerCounters, 0);
00170 }
00171 
00172 //__________________________________________________________________________
00173 int XrdProofdProofServ::Reset(const char *msg, int type)
00174 {
00175    // Reset this instance, broadcasting a message to the clients.
00176    // return 1 if top master, 0 otherwise
00177    XPDLOC(SMGR, "ProofServ::Reset")
00178 
00179    int rc = 0;
00180    // Read the status file
00181    int st = -1;
00182    XrdOucString fn;
00183    XPDFORM(fn, "%s.status", fAdminPath.c_str());
00184    FILE *fpid = fopen(fn.c_str(), "r");
00185    if (fpid) {
00186       if (fscanf(fpid, "%d", &st) <= 0)
00187          TRACE(XERR,"problems reading from file "<<fn);
00188       fclose(fpid);
00189    }
00190    TRACE(DBG,"file: "<<fn<<", st:"<<st);
00191    XrdSysMutexHelper mhp(fMutex);
00192    // Broadcast msg
00193    if (st == 4) {
00194       Broadcast("idle-timeout", type);
00195    } else {
00196       Broadcast(msg, type);
00197    }
00198    // What kind of server is this?
00199    if (fSrvType == kXPD_TopMaster) rc = 1;
00200    // Reset instance
00201    Reset();
00202    // Done
00203    return rc;
00204 }
00205 
00206 //__________________________________________________________________________
00207 void XrdProofdProofServ::Reset()
00208 {
00209    // Reset this instance
00210    XrdSysMutexHelper mhp(fMutex);
00211 
00212    fResponse = 0;
00213    fProtocol = 0;
00214    fParent = 0;
00215    SafeDelete(fStartMsg);
00216    SafeDelete(fPingSem);
00217    fSrvPID = -1;
00218    fID = -1;
00219    fIsShutdown = false;
00220    fIsValid = false;
00221    fSkipCheck = false;
00222    fProtVer = -1;
00223    fNClients = 0;
00224    fClients.clear();
00225    fDisconnectTime = -1;
00226    fSetIdleTime = -1;
00227    fROOT = 0;
00228    // Cleanup worker info
00229    ClearWorkers();
00230    // ClearWorkers depends on the fSrvType and fStatus
00231    fSrvType = kXPD_AnyServer;
00232    fStatus = kXPD_idle;
00233    // Cleanup queries info
00234    fQueries.clear();
00235    // Strings
00236    fAdminPath = "";
00237    fAlias = "";
00238    fClient = "";
00239    fFileout = "";
00240    fGroup = "";
00241    fOrdinal = "";
00242    fTag = "";
00243    fUserEnvs = "";
00244    DeleteUNIXSock();
00245 }
00246 
00247 //__________________________________________________________________________
00248 void XrdProofdProofServ::DeleteUNIXSock()
00249 {
00250    // Delete the current UNIX socket
00251 
00252    SafeDelete(fUNIXSock);
00253    unlink(fUNIXSockPath.c_str());
00254    fUNIXSockPath = "";
00255 }
00256 
00257 //__________________________________________________________________________
00258 bool XrdProofdProofServ::SkipCheck()
00259 {
00260    // Return the value of fSkipCheck and reset it to false
00261 
00262    XrdSysMutexHelper mhp(fMutex);
00263 
00264    bool rc = fSkipCheck;
00265    fSkipCheck = false;
00266    return rc;
00267 }
00268 
00269 //__________________________________________________________________________
00270 XrdClientID *XrdProofdProofServ::GetClientID(int cid)
00271 {
00272    // Get instance corresponding to cid
00273    XPDLOC(SMGR, "ProofServ::GetClientID")
00274 
00275    XrdClientID *csid = 0;
00276 
00277    if (cid < 0) {
00278       TRACE(XERR, "negative ID: protocol error!");
00279       return csid;
00280    }
00281 
00282    XrdOucString msg;
00283    {  XrdSysMutexHelper mhp(fMutex);
00284 
00285       // Count new attached client
00286       fNClients++;
00287 
00288       // If in the allocate range reset the corresponding instance and
00289       // return it
00290       if (cid < (int)fClients.size()) {
00291          csid = fClients.at(cid);
00292          csid->Reset();
00293 
00294          // Notification message
00295          if (TRACING(DBG)) {
00296             XPDFORM(msg, "cid: %d, size: %d", cid, fClients.size());
00297          }
00298       }
00299 
00300       if (!csid) {
00301          // If not, allocate a new one; we need to resize (double it)
00302          if (cid >= (int)fClients.capacity())
00303             fClients.reserve(2*fClients.capacity());
00304 
00305          // Allocate new elements (for fast access we need all of them)
00306          int ic = (int)fClients.size();
00307          for (; ic <= cid; ic++)
00308             fClients.push_back((csid = new XrdClientID()));
00309 
00310          // Notification message
00311          if (TRACING(DBG)) {
00312             XPDFORM(msg, "cid: %d, new size: %d", cid, fClients.size());
00313          }
00314       }
00315    }
00316    TRACE(DBG, msg);
00317 
00318    // We are done
00319    return csid;
00320 }
00321 
00322 //__________________________________________________________________________
00323 int XrdProofdProofServ::FreeClientID(int pid)
00324 {
00325    // Free instance corresponding to protocol connecting process 'pid'
00326    XPDLOC(SMGR, "ProofServ::FreeClientID")
00327 
00328    TRACE(DBG, "svrPID: "<<fSrvPID<< ", pid: "<<pid<<", session status: "<<
00329               fStatus<<", # clients: "<< fNClients);
00330    int rc = -1;
00331    if (pid <= 0) {
00332       TRACE(XERR, "undefined pid!");
00333       return rc;
00334    }
00335    if (!IsValid()) return rc;
00336 
00337    {  XrdSysMutexHelper mhp(fMutex);
00338 
00339       // Remove this from the list of clients
00340       std::vector<XrdClientID *>::iterator i;
00341       for (i = fClients.begin(); i != fClients.end(); ++i) {
00342          if ((*i) && (*i)->P()) {
00343             if ((*i)->P()->Pid() == pid || (*i)->P()->Pid() == -1) {
00344                (*i)->Reset();
00345                fNClients--;
00346                // Record time of last disconnection
00347                if (fNClients <= 0)
00348                   fDisconnectTime = time(0);
00349                rc = 0;
00350                break;
00351             }
00352          }
00353       }
00354    }
00355    if (TRACING(REQ) && (rc == 0)) {
00356       int spid = SrvPID();
00357       TRACE(REQ, spid<<": slot for client pid: "<<pid<<" has been reset");
00358    }
00359 
00360    // Out of range
00361    return rc;
00362 }
00363 
00364 //__________________________________________________________________________
00365 int XrdProofdProofServ::GetNClients(bool check)
00366 {
00367    // Get the number of connected clients. If check is true check that
00368    // they are still valid ones and free the slots for the invalid ones
00369 
00370    XrdSysMutexHelper mhp(fMutex);
00371 
00372    if (check) {
00373       fNClients = 0;
00374       // Remove this from the list of clients
00375       std::vector<XrdClientID *>::iterator i;
00376       for (i = fClients.begin(); i != fClients.end(); ++i) {
00377          if ((*i) && (*i)->P() && (*i)->P()->Link()) fNClients++;
00378       }
00379    }
00380 
00381    // Done
00382    return fNClients;
00383 }
00384 
00385 //__________________________________________________________________________
00386 int XrdProofdProofServ::DisconnectTime()
00387 {
00388    // Return the time (in secs) all clients have been disconnected.
00389    // Return -1 if the session is running
00390 
00391    XrdSysMutexHelper mhp(fMutex);
00392 
00393    int disct = -1;
00394    if (fDisconnectTime > 0)
00395       disct = time(0) - fDisconnectTime;
00396    return ((disct > 0) ? disct : -1);
00397 }
00398 
00399 //__________________________________________________________________________
00400 int XrdProofdProofServ::IdleTime()
00401 {
00402    // Return the time (in secs) the session has been idle.
00403    // Return -1 if the session is running
00404 
00405    XrdSysMutexHelper mhp(fMutex);
00406 
00407    int idlet = -1;
00408    if (fStatus == kXPD_idle)
00409       idlet = time(0) - fSetIdleTime;
00410    return ((idlet > 0) ? idlet : -1);
00411 }
00412 
00413 //__________________________________________________________________________
00414 void XrdProofdProofServ::SetIdle()
00415 {
00416    // Set status to idle and update the related time stamp
00417    //
00418 
00419    XrdSysMutexHelper mhp(fMutex);
00420 
00421    fStatus = kXPD_idle;
00422    fSetIdleTime = time(0);
00423 }
00424 
00425 //__________________________________________________________________________
00426 void XrdProofdProofServ::SetRunning()
00427 {
00428    // Set status to running and reset the related time stamp
00429    //
00430 
00431    XrdSysMutexHelper mhp(fMutex);
00432 
00433    fStatus = kXPD_running;
00434    fSetIdleTime = -1;
00435 }
00436 
00437 //______________________________________________________________________________
00438 void XrdProofdProofServ::Broadcast(const char *msg, int type)
00439 {
00440    // Broadcast message 'msg' at 'type' to the attached clients
00441    XPDLOC(SMGR, "ProofServ::Broadcast")
00442 
00443    // Backward-compatibility check
00444    int clproto = (type >= kXPD_wrkmortem) ? 18 : -1;
00445 
00446    XrdOucString m;
00447    int len = 0, nc = 0;
00448    if (msg && (len = strlen(msg)) > 0) {
00449       XrdProofdProtocol *p = 0;
00450       int ic = 0, ncz = 0, sid = -1;
00451       { XrdSysMutexHelper mhp(fMutex); ncz = (int) fClients.size(); }
00452       for (ic = 0; ic < ncz; ic++) {
00453          {  XrdSysMutexHelper mhp(fMutex);
00454             p = fClients.at(ic)->P();
00455             sid = fClients.at(ic)->Sid(); }
00456          // Send message
00457          if (p && XPD_CLNT_VERSION_OK(p, clproto)) {
00458             XrdProofdResponse *response = p->Response(sid);
00459             if (response) {
00460                response->Send(kXR_attn, (XProofActionCode)type, (void *)msg, len);
00461                nc++;
00462             } else {
00463                XPDFORM(m, "response instance for sid: %d not found", sid);
00464             }
00465          }
00466          if (m.length() > 0)
00467             TRACE(XERR, m);
00468          m = "";
00469       }
00470    }
00471    if (TRACING(DBG)) {
00472       XPDFORM(m, "type: %d, message: '%s' notified to %d clients", type, msg, nc);
00473       XPDPRT(m);
00474    }
00475 }
00476 
00477 //______________________________________________________________________________
00478 int XrdProofdProofServ::TerminateProofServ(bool changeown)
00479 {
00480    // Terminate the associated process.
00481    // A shutdown interrupt message is forwarded.
00482    // If add is TRUE (default) the pid is added to the list of processes
00483    // requested to terminate.
00484    // Return the pid of tyhe terminated process on success, -1 if not allowed
00485    // or other errors occured.
00486    XPDLOC(SMGR, "ProofServ::TerminateProofServ")
00487 
00488    int pid = fSrvPID;
00489    TRACE(DBG, "ord: " << fOrdinal << ", pid: " << pid);
00490 
00491    // Send a terminate signal to the proofserv
00492    if (pid > -1) {
00493       XrdProofUI ui;
00494       XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
00495       if (XrdProofdAux::KillProcess(pid, 0, ui, changeown) != 0) {
00496          TRACE(XERR, "ord: problems signalling process: "<<fSrvPID);
00497       }
00498       XrdSysMutexHelper mhp(fMutex);
00499       fIsShutdown = true;
00500    }
00501 
00502    // Failed
00503    return -1;
00504 }
00505 
00506 //______________________________________________________________________________
00507 int XrdProofdProofServ::VerifyProofServ(bool forward)
00508 {
00509    // Check if the associated proofserv process is alive. This is done
00510    // asynchronously by asking the process to callback and proof its vitality.
00511    // We do not block here: the caller may setup a waiting structure if
00512    // required.
00513    // If forward is true, the process will forward the request to the following
00514    // tiers.
00515    // Return 0 if the request was send successfully, -1 in case of error.
00516    XPDLOC(SMGR, "ProofServ::VerifyProofServ")
00517 
00518    TRACE(DBG, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
00519 
00520    int rc = 0;
00521    XrdOucString msg;
00522 
00523    // Prepare buffer
00524    int len = sizeof(kXR_int32);
00525    char *buf = new char[len];
00526    // Option
00527    kXR_int32 ifw = (forward) ? (kXR_int32)1 : (kXR_int32)0;
00528    ifw = static_cast<kXR_int32>(htonl(ifw));
00529    memcpy(buf, &ifw, sizeof(kXR_int32));
00530 
00531    {  XrdSysMutexHelper mhp(fMutex);
00532       // Propagate the ping request
00533       if (!fResponse || fResponse->Send(kXR_attn, kXPD_ping, buf, len) != 0) {
00534          msg = "could not propagate ping to proofsrv";
00535          rc = -1;
00536       }
00537    }
00538    // Cleanup
00539    delete[] buf;
00540 
00541    // Notify errors, if any
00542    if (rc != 0)
00543       TRACE(XERR, msg);
00544 
00545    // Done
00546    return rc;
00547 }
00548 
00549 //__________________________________________________________________________
00550 int XrdProofdProofServ::BroadcastPriority(int priority)
00551 {
00552    // Broadcast a new group priority value to the worker servers.
00553    // Called by masters.
00554    XPDLOC(SMGR, "ProofServ::BroadcastPriority")
00555 
00556    XrdSysMutexHelper mhp(fMutex);
00557 
00558    // Prepare buffer
00559    int len = sizeof(kXR_int32);
00560    char *buf = new char[len];
00561    kXR_int32 itmp = priority;
00562    itmp = static_cast<kXR_int32>(htonl(itmp));
00563    memcpy(buf, &itmp, sizeof(kXR_int32));
00564    // Send over
00565    if (!fResponse || fResponse->Send(kXR_attn, kXPD_priority, buf, len) != 0) {
00566       // Failure
00567       TRACE(XERR,"problems telling proofserv");
00568       return -1;
00569    }
00570    TRACE(DBG, "priority "<<priority<<" sent over");
00571    // Done
00572    return 0;
00573 }
00574 
00575 //______________________________________________________________________________
00576 int XrdProofdProofServ::SendData(int cid, void *buff, int len)
00577 {
00578    // Send data to client cid.
00579    XPDLOC(SMGR, "ProofServ::SendData")
00580 
00581    TRACE(HDBG, "length: "<<len<<" bytes (cid: "<<cid<<")");
00582 
00583    int rs = 0;
00584    XrdOucString msg;
00585 
00586    // Get corresponding instance
00587    XrdClientID *csid = 0;
00588    {  XrdSysMutexHelper mhp(fMutex);
00589       if (cid < 0 || cid > (int)(fClients.size() - 1) || !(csid = fClients.at(cid))) {
00590          XPDFORM(msg, "client ID not found (cid: %d, size: %d)", cid, fClients.size());
00591          rs = -1;
00592       }
00593       if (!rs && !(csid->R())) {
00594          XPDFORM(msg, "client not connected: csid: %p, cid: %d, fSid: %d",
00595                        csid, cid, csid->Sid());
00596          rs = -1;
00597       }
00598    }
00599 
00600    //
00601    // The message is strictly for the client requiring it
00602    if (!rs) {
00603       rs = -1;
00604       XrdProofdResponse *response = csid->R() ? csid->R() : 0;
00605       if (response)
00606          if (!response->Send(kXR_attn, kXPD_msg, buff, len))
00607             rs = 0;
00608    } else {
00609       // Notify error
00610       TRACE(XERR, msg);
00611    }
00612 
00613    // Done
00614    return rs;
00615 }
00616 
00617 //______________________________________________________________________________
00618 int XrdProofdProofServ::SendDataN(void *buff, int len)
00619 {
00620    // Send data over the open client links of this session.
00621    // Used when all the connected clients are eligible to receive the message.
00622    XPDLOC(SMGR, "ProofServ::SendDataN")
00623 
00624    TRACE(HDBG, "length: "<<len<<" bytes");
00625 
00626    XrdOucString msg;
00627 
00628    XrdSysMutexHelper mhp(fMutex);
00629 
00630    // Send to connected clients
00631    XrdClientID *csid = 0;
00632    int ic = 0;
00633    for (ic = 0; ic < (int) fClients.size(); ic++) {
00634       if ((csid = fClients.at(ic)) && csid->P()) {
00635          XrdProofdResponse *resp = csid->R();
00636          if (!resp || resp->Send(kXR_attn, kXPD_msg, buff, len) != 0)
00637             return -1;
00638       }
00639    }
00640 
00641    // Done
00642    return 0;
00643 }
00644 
00645 //______________________________________________________________________________
00646 void XrdProofdProofServ::ExportBuf(XrdOucString &buf)
00647 {
00648    // Fill buf with relevant info about this session
00649    XPDLOC(SMGR, "ProofServ::ExportBuf")
00650 
00651    buf = "";
00652    int id, status, nc;
00653    XrdOucString tag, alias;
00654    {  XrdSysMutexHelper mhp(fMutex);
00655       id = fID;
00656       status = fStatus;
00657       nc = fNClients;
00658       tag = fTag;
00659       alias = fAlias; }
00660    XPDFORM(buf, " | %d %s %s %d %d", id, tag.c_str(), alias.c_str(), status, nc);
00661    TRACE(HDBG, "buf: "<< buf);
00662 
00663    // Done
00664    return;
00665 }
00666 
00667 //______________________________________________________________________________
00668 int XrdProofdProofServ::CreateUNIXSock(XrdSysError *edest)
00669 {
00670    // Create UNIX socket for internal connections
00671    XPDLOC(SMGR, "ProofServ::CreateUNIXSock")
00672 
00673    TRACE(DBG, "enter");
00674 
00675    // Make sure we do not have already a socket
00676    if (fUNIXSock) {
00677        TRACE(DBG,"UNIX socket exists already! ("<<fUNIXSockPath<<")");
00678        return 0;
00679    }
00680 
00681    // Create socket
00682    fUNIXSock = new XrdNet(edest);
00683 
00684    // Make sure the admin path exists
00685    struct stat st;
00686    if (fAdminPath.length() > 0 &&
00687        stat(fAdminPath.c_str(), &st) != 0 && (errno == ENOENT)) {;
00688       FILE *fadm = fopen(fAdminPath.c_str(), "w");
00689       fclose(fadm);
00690    }
00691 
00692    // Check the path
00693    bool rm = 0, ok = 0;
00694    if (stat(fUNIXSockPath.c_str(), &st) == 0 || (errno != ENOENT)) rm = 1;
00695    if (rm  && unlink(fUNIXSockPath.c_str()) != 0) {
00696       if (!S_ISSOCK(st.st_mode)) {
00697          TRACE(XERR, "non-socket path exists: unable to delete it: " <<fUNIXSockPath);
00698          return -1;
00699       } else {
00700          XPDPRT("WARNING: socket path exists: unable to delete it:"
00701                 " try to use it anyway " <<fUNIXSockPath);
00702          ok = 1;
00703       }
00704    }
00705 
00706    // Create the path
00707    int fd = 0;
00708    if (!ok) {
00709       if ((fd = open(fUNIXSockPath.c_str(), O_EXCL | O_RDWR | O_CREAT, 0700)) < 0) {
00710          TRACE(XERR, "unable to create path: " <<fUNIXSockPath);
00711          return -1;
00712       }
00713       close(fd);
00714    }
00715    if (fd > -1) {
00716       // Change ownership
00717       if (fUNIXSock->Bind((char *)fUNIXSockPath.c_str())) {
00718          TRACE(XERR, " problems binding to UNIX socket; path: " <<fUNIXSockPath);
00719          return -1;
00720       } else
00721          TRACE(DBG, "path for UNIX for socket is " <<fUNIXSockPath);
00722    } else {
00723       TRACE(XERR, "unable to open / create path for UNIX socket; tried path "<< fUNIXSockPath);
00724       return -1;
00725    }
00726 
00727    // Change ownership if running as super-user
00728    if (!geteuid()) {
00729       XrdProofUI ui;
00730       XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
00731       if (chown(fUNIXSockPath.c_str(), ui.fUid, ui.fGid) != 0) {
00732          TRACE(XERR, "unable to change ownership of the UNIX socket"<<fUNIXSockPath);
00733          return -1;
00734       }
00735    }
00736 
00737    // We are done
00738    return 0;
00739 }
00740 
00741 //__________________________________________________________________________
00742 int XrdProofdProofServ::SetAdminPath(const char *a, bool assert)
00743 {
00744    // Set the admin path and make sure the file exists
00745 
00746    XPDLOC(SMGR, "ProofServ::SetAdminPath")
00747 
00748    XrdSysMutexHelper mhp(fMutex);
00749 
00750    fAdminPath = a;
00751 
00752    // If we are not asked to assert the file we are done
00753    if (!assert) return 0;
00754 
00755    // The session file
00756    struct stat st;
00757    if (stat(a, &st) != 0 && errno == ENOENT) {
00758       // Create the file
00759       FILE *fpid = fopen(a, "w");
00760       if (fpid) {
00761          fclose(fpid);
00762       } else {
00763          TRACE(XERR, "unable to open / create admin path "<< fAdminPath << "; errno = "<<errno);
00764          return -1;
00765       }
00766    }
00767 
00768    // The status file
00769    XrdOucString fn;
00770    XPDFORM(fn, "%s.status", a);
00771    if (stat(fn.c_str(), &st) != 0 && errno == ENOENT) {
00772       // Create the file
00773       FILE *fpid = fopen(fn.c_str(), "w");
00774       if (fpid) {
00775          fprintf(fpid, "%d", fStatus);
00776          fclose(fpid);
00777       } else {
00778          TRACE(XERR, "unable to open / create status path "<< fn << "; errno = "<<errno);
00779          return -1;
00780       }
00781    }
00782    // Set the ownership of the status file to the user
00783    XrdProofUI ui;
00784    if (XrdProofdAux::GetUserInfo(fClient.c_str(), ui) != 0) {
00785       TRACE(XERR, "unable to get info for user "<<fClient<<"; errno = "<<errno);
00786       return -1;
00787    }
00788    if (XrdProofdAux::ChangeOwn(fn.c_str(), ui) != 0) {
00789       TRACE(XERR, "unable to give ownership of the status file "<< fn << " to user; errno = "<<errno);
00790       return -1;
00791    }
00792    // Check
00793    if (stat(fn.c_str(), &st) != 0) {
00794       TRACE(XERR, "creation/assertion of the status path "<< fn << " failed; errno = "<<errno);
00795       return -1;
00796    } else {
00797       TRACE(ALL, "creation/assertion of the status path "<< fn << " was successful!");
00798    }
00799 
00800    // Done
00801    return 0;
00802 }
00803 
00804 //______________________________________________________________________________
00805 int XrdProofdProofServ::Resume()
00806 {
00807    // Send a resume message to the this session. It is assumed that the session
00808    // has at least one async query to process and will immediately send
00809    // a getworkers request (the workers are already assigned).
00810    XPDLOC(SMGR, "ProofServ::Resume")
00811 
00812    TRACE(REQ, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
00813 
00814    int rc = 0;
00815    XrdOucString msg;
00816 
00817    {  XrdSysMutexHelper mhp(fMutex);
00818       // 
00819       if (!fResponse || fResponse->Send(kXR_attn, kXPD_resume, 0, 0) != 0) {
00820          msg = "could not propagate resume to proofsrv";
00821          rc = -1;
00822       }
00823    }
00824 
00825    // Notify errors, if any
00826    if (rc != 0)
00827       TRACE(XERR, msg);
00828 
00829    // Done
00830    return rc;
00831 }
00832 
00833 //__________________________________________________________________________
00834 static int ExportWorkerDescription(const char *k, XrdProofWorker *w, void *s)
00835 {
00836    // Decrease active session counters on worker w
00837    XPDLOC(PMGR, "ExportWorkerDescription")
00838 
00839    XrdOucString *wrks = (XrdOucString *)s;
00840    if (w && wrks) {
00841       // Master at the beginning
00842       if (w->fType == 'M') {
00843          if (wrks->length() > 0) wrks->insert('&',0);
00844          wrks->insert(w->Export(), 0);
00845       } else {
00846          // Add separator if not the first
00847          if (wrks->length() > 0)
00848             (*wrks) += '&';
00849          // Add export version of the info
00850          (*wrks) += w->Export(k);
00851       }
00852       TRACE(HDBG, k <<" : "<<w->fHost.c_str()<<":"<<w->fPort <<" act: "<<w->Active());
00853       // Check next
00854       return 0;
00855    }
00856 
00857    // Not enough info: stop
00858    return 1;
00859 }
00860 
00861 //__________________________________________________________________________
00862 void XrdProofdProofServ::ExportWorkers(XrdOucString &wrks)
00863 {
00864    // Export the assigned workers in the format understood by proofserv
00865 
00866    XrdSysMutexHelper mhp(fMutex);
00867    wrks = "";
00868    fWorkers.Apply(ExportWorkerDescription, (void *)&wrks);
00869 }
00870 
00871 //__________________________________________________________________________
00872 void XrdProofdProofServ::DumpQueries()
00873 {
00874    // Export the assigned workers in the format understood by proofserv
00875    XPDLOC(PMGR, "DumpQueries")
00876 
00877    XrdSysMutexHelper mhp(fMutex);
00878 
00879    TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
00880    TRACE(ALL," +++ client: "<<fClient<<", session: "<< fSrvPID <<
00881              ", # of queries: "<< fQueries.size());
00882    std::list<XrdProofQuery *>::iterator ii;
00883    int i = 0;
00884    for (ii = fQueries.begin(); ii != fQueries.end(); ii++) {
00885       i++;
00886       TRACE(ALL," +++ #"<<i<<" tag:"<< (*ii)->GetTag()<<" dset: "<<
00887                 (*ii)->GetDSName()<<" size:"<<(*ii)->GetDSSize());
00888    }
00889    TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
00890 }
00891 
00892 //__________________________________________________________________________
00893 XrdProofQuery *XrdProofdProofServ::GetQuery(const char *tag)
00894 {
00895    // Get query with tag form the list of queries
00896    XrdProofQuery *q = 0;
00897    if (!tag || strlen(tag) <= 0) return q;
00898 
00899    XrdSysMutexHelper mhp(fMutex);
00900 
00901    if (fQueries.size() <= 0) return q;
00902 
00903    std::list<XrdProofQuery *>::iterator ii;
00904    for (ii = fQueries.begin(); ii != fQueries.end(); ii++) {
00905       q = *ii;
00906       if (!strcmp(tag, q->GetTag())) break;
00907       q = 0;
00908    }
00909    // Done
00910    return q;
00911 }
00912 
00913 //__________________________________________________________________________
00914 void XrdProofdProofServ::RemoveQuery(const char *tag)
00915 {
00916    // remove query with tag form the list of queries
00917    XrdProofQuery *q = 0;
00918    if (!tag || strlen(tag) <= 0) return;
00919 
00920    XrdSysMutexHelper mhp(fMutex);
00921 
00922    if (fQueries.size() <= 0) return;
00923 
00924    std::list<XrdProofQuery *>::iterator ii;
00925    for (ii = fQueries.begin(); ii != fQueries.end(); ii++) {
00926       q = *ii;
00927       if (!strcmp(tag, q->GetTag())) break;
00928       q = 0;
00929    }
00930    // remove it
00931    if (q) {
00932       fQueries.remove(q);
00933       delete q;
00934    }
00935 
00936    // Done
00937    return;
00938 }
00939 
00940 //__________________________________________________________________________
00941 static int CountEffectiveSessions(const char *, XrdProofWorker *w, void *s)
00942 {
00943    // Decrease active session counters on worker w
00944 
00945    int *actw = (int *)s;
00946    if (w && actw) {
00947       *actw += w->GetNActiveSessions();
00948       // Check next
00949       return 0;
00950    }
00951 
00952    // Not enough info: stop
00953    return 1;
00954 }
00955 
00956 //__________________________________________________________________________
00957 void XrdProofdProofServ::SendClusterInfo(int nsess, int nacti)
00958 {
00959    // Calculate the effective number of users on this session nodes
00960    // and communicate it to the master together with the total number
00961    // of sessions and the number of active sessions. for monitoring issues.
00962    XPDLOC(PMGR, "SendClusterInfo")
00963 
00964    // Only if we are active
00965    if (fWorkers.Num() <= 0) return;
00966 
00967    int actw = 0;
00968    fWorkers.Apply(CountEffectiveSessions, (void *)&actw);
00969    // The number of effective sessions * 1000
00970    int neffs = (actw*1000)/fWorkers.Num();
00971    TRACE(DBG, "# sessions: "<<nsess<<", # active: "<<nacti<<", # effective: "<<neffs/1000.);
00972 
00973    XrdSysMutexHelper mhp(fMutex);
00974 
00975    // Prepare buffer
00976    int len = 3*sizeof(kXR_int32);
00977    char *buf = new char[len];
00978    kXR_int32 off = 0;
00979    kXR_int32 itmp = nsess;
00980    itmp = static_cast<kXR_int32>(htonl(itmp));
00981    memcpy(buf + off, &itmp, sizeof(kXR_int32));
00982    off += sizeof(kXR_int32);
00983    itmp = nacti;
00984    itmp = static_cast<kXR_int32>(htonl(itmp));
00985    memcpy(buf + off, &itmp, sizeof(kXR_int32));
00986    off += sizeof(kXR_int32);
00987    itmp = neffs;
00988    itmp = static_cast<kXR_int32>(htonl(itmp));
00989    memcpy(buf + off, &itmp, sizeof(kXR_int32));
00990    // Send over
00991    if (!fResponse || fResponse->Send(kXR_attn, kXPD_clusterinfo, buf, len) != 0) {
00992       // Failure
00993       TRACE(XERR,"problems sending proofserv");
00994    }
00995 
00996 }
00997 
00998 //__________________________________________________________________________
00999 int XrdProofdProofServ::CheckSession(bool oldvers, bool isrec,
01000                                       int shutopt, int shutdel, bool changeown, int &nc)
01001 {
01002    // Calculate the effective number of users on this session nodes
01003    // and communicate it to the master together with the total number
01004    // of sessions and the number of active sessions. for monitoring issues.
01005    XPDLOC(PMGR, "SendClusterInfo")
01006 
01007    XrdOucString emsg;
01008    bool rmsession = 0;
01009    nc = -1;
01010    {  XrdSysMutexHelper mhp(fMutex);
01011 
01012       bool skipcheck = fSkipCheck;
01013       fSkipCheck = false;
01014 
01015       if (!skipcheck || oldvers) {
01016          nc = 0;
01017          // Remove this from the list of clients
01018          std::vector<XrdClientID *>::iterator i;
01019          for (i = fClients.begin(); i != fClients.end(); ++i) {
01020             if ((*i) && (*i)->P() && (*i)->P()->Link()) nc++;
01021          }
01022          // Check if we need to shutdown it
01023          if (nc <= 0 && (!isrec || oldvers)) {
01024             int idlet = -1, disct = -1, now = time(0);
01025             if (fStatus == kXPD_idle)
01026                idlet = now - fSetIdleTime;
01027             if (idlet <= 0) idlet = -1;
01028             if (fDisconnectTime > 0)
01029                disct = now - fDisconnectTime;
01030             if (disct <= 0) disct = -1;
01031             if ((fSrvType != kXPD_TopMaster) ||
01032                 (shutopt == 1 && (idlet >= shutdel)) ||
01033                 (shutopt == 2 && (disct >= shutdel))) {
01034                // Send a terminate signal to the proofserv
01035                if (fSrvPID > -1) {
01036                   XrdProofUI ui;
01037                   XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
01038                   if (XrdProofdAux::KillProcess(fSrvPID, 0, ui, changeown) != 0) {
01039                      XPDFORM(emsg, "ord: problems signalling process: %d", fSrvPID);
01040                   }
01041                   fIsShutdown = true;
01042                }
01043                rmsession = 1;
01044             }
01045          }
01046       }
01047    }
01048    // Notify error, if any
01049    if (emsg.length() > 0) {
01050       TRACE(XERR,emsg.c_str());
01051    }
01052    // Done
01053    return rmsession;
01054 }

Generated on Tue Jul 5 14:51:51 2011 for ROOT_528-00b_version by  doxygen 1.5.1