XrdProofdProtocol.cxx

Go to the documentation of this file.
00001 // @(#)root/proofd:$Id: XrdProofdProtocol.cxx 33410 2010-05-06 16:17:11Z ganis $
00002 // Author: Gerardo Ganis  12/12/2005
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // XrdProofdProtocol                                                    //
00015 //                                                                      //
00016 // Authors: G. Ganis, CERN, 2005                                        //
00017 //                                                                      //
00018 // XrdProtocol implementation to coordinate 'proofserv' applications.   //
00019 //                                                                      //
00020 //////////////////////////////////////////////////////////////////////////
00021 
00022 #include "XrdProofdPlatform.h"
00023 
00024 #ifdef OLDXRDOUC
00025 #  include "XrdOuc/XrdOucError.hh"
00026 #  include "XrdOuc/XrdOucLogger.hh"
00027 #else
00028 #  include "XrdSys/XrdSysError.hh"
00029 #  include "XrdSys/XrdSysLogger.hh"
00030 #endif
00031 #include "XrdSys/XrdSysPriv.hh"
00032 #include "XrdOuc/XrdOucStream.hh"
00033 
00034 #include "XrdVersion.hh"
00035 #include "Xrd/XrdBuffer.hh"
00036 #include "XrdNet/XrdNetDNS.hh"
00037 
00038 #include "XrdProofdClient.h"
00039 #include "XrdProofdClientMgr.h"
00040 #include "XrdProofdConfig.h"
00041 #include "XrdProofdManager.h"
00042 #include "XrdProofdNetMgr.h"
00043 #include "XrdProofdPriorityMgr.h"
00044 #include "XrdProofdProofServMgr.h"
00045 #include "XrdProofdProtocol.h"
00046 #include "XrdProofdResponse.h"
00047 #include "XrdProofdProofServ.h"
00048 #include "XrdProofSched.h"
00049 
00050 // Tracing utils
00051 #include "XrdProofdTrace.h"
00052 XrdOucTrace          *XrdProofdTrace = 0;
00053 
00054 // Loggers: we need two to avoid deadlocks
00055 static XrdSysLogger   gMainLogger;
00056 
00057 //
00058 // Static area: general protocol managing section
00059 int                   XrdProofdProtocol::fgCount    = 0;
00060 XrdObjectQ<XrdProofdProtocol>
00061                       XrdProofdProtocol::fgProtStack("ProtStack",
00062                                                      "xproofd protocol anchor");
00063 XrdSysRecMutex        XrdProofdProtocol::fgBMutex;    // Buffer management mutex
00064 XrdBuffManager       *XrdProofdProtocol::fgBPool    = 0;
00065 int                   XrdProofdProtocol::fgMaxBuffsz= 0;
00066 XrdSysError           XrdProofdProtocol::fgEDest(0, "xpd");
00067 XrdSysLogger         *XrdProofdProtocol::fgLogger   = 0;
00068 //
00069 // Static area: protocol configuration section
00070 bool                  XrdProofdProtocol::fgConfigDone = 0;
00071 //
00072 int                   XrdProofdProtocol::fgReadWait = 0;
00073 // Cluster manager
00074 XrdProofdManager     *XrdProofdProtocol::fgMgr = 0;
00075 
00076 // Effective uid
00077 int                   XrdProofdProtocol::fgEUidAtStartup = -1;
00078 
00079 // Local definitions
00080 #define MAX_ARGS 128
00081 
00082 // Macros used to set conditional options
00083 #ifndef XPDCOND
00084 #define XPDCOND(n,ns) ((n == -1 && ns == -1) || (n > 0 && n >= ns))
00085 #endif
00086 #ifndef XPDSETSTRING
00087 #define XPDSETSTRING(n,ns,c,s) \
00088  { if (XPDCOND(n,ns)) { \
00089      SafeFree(c); c = strdup(s.c_str()); ns = n; }}
00090 #endif
00091 
00092 #ifndef XPDADOPTSTRING
00093 #define XPDADOPTSTRING(n,ns,c,s) \
00094   { char *t = 0; \
00095     XPDSETSTRING(n, ns, t, s); \
00096     if (t && strlen(t)) { \
00097        SafeFree(c); c = t; \
00098   } else \
00099        SafeFree(t); }
00100 #endif
00101 
00102 #ifndef XPDSETINT
00103 #define XPDSETINT(n,ns,i,s) \
00104  { if (XPDCOND(n,ns)) { \
00105      i = strtol(s.c_str(),0,10); ns = n; }}
00106 #endif
00107 
00108 typedef struct {
00109    kXR_int32 ptyp;  // must be always 0 !
00110    kXR_int32 rlen;
00111    kXR_int32 pval;
00112    kXR_int32 styp;
00113 } hs_response_t;
00114 
00115 typedef struct ResetCtrlcGuard {
00116    XrdProofdProtocol *xpd;
00117    int                type;
00118    ResetCtrlcGuard(XrdProofdProtocol *p, int t) : xpd(p), type(t) { }
00119    ~ResetCtrlcGuard() { if (xpd && type != kXP_ctrlc) xpd->ResetCtrlC(); }
00120 } ResetCtrlcGuard_t;
00121 
00122 //
00123 // Derivation of XrdProofdConfig to read the port from the config file
00124 class XrdProofdProtCfg : public XrdProofdConfig {
00125 public:
00126    int  fPort; // The port on which we listen
00127    XrdProofdProtCfg(const char *cfg, XrdSysError *edest = 0);
00128    int  DoDirective(XrdProofdDirective *, char *, XrdOucStream *, bool);
00129    void RegisterDirectives();
00130 };
00131 
00132 //__________________________________________________________________________
00133 XrdProofdProtCfg::XrdProofdProtCfg(const char *cfg, XrdSysError *edest)
00134                  : XrdProofdConfig(cfg, edest)
00135 {
00136    // Constructor
00137 
00138    fPort = -1;
00139    RegisterDirectives();
00140 }
00141 
00142 //__________________________________________________________________________
00143 void XrdProofdProtCfg::RegisterDirectives()
00144 {
00145    // Register directives for configuration
00146 
00147    Register("port", new XrdProofdDirective("port", this, &DoDirectiveClass));
00148    Register("xrd.protocol", new XrdProofdDirective("xrd.protocol", this, &DoDirectiveClass));
00149 }
00150 
00151 //______________________________________________________________________________
00152 int XrdProofdProtCfg::DoDirective(XrdProofdDirective *d,
00153                                   char *val, XrdOucStream *cfg, bool)
00154 {
00155    // Parse directives
00156 
00157    if (!d) return -1;
00158 
00159    XrdOucString port(val);
00160    if (d->fName == "xrd.protocol") {
00161       port = cfg->GetWord();
00162       port.replace("xproofd:", "");
00163    } else if (d->fName != "port") {
00164       return -1;
00165    }
00166    if (port.length() > 0) {
00167       fPort = strtol(port.c_str(), 0, 10);
00168    }
00169    fPort = (fPort < 0) ? XPD_DEF_PORT : fPort;
00170    return 0;
00171 }
00172 
00173 
00174 extern "C" {
00175 //_________________________________________________________________________________
00176 XrdProtocol *XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi)
00177 {
00178    // This protocol is meant to live in a shared library. The interface below is
00179    // used by the server to obtain a copy of the protocol object that can be used
00180    // to decide whether or not a link is talking a particular protocol.
00181 
00182    // Return the protocol object to be used if static init succeeds
00183    if (XrdProofdProtocol::Configure(parms, pi)) {
00184 
00185       return (XrdProtocol *) new XrdProofdProtocol();
00186    }
00187    return (XrdProtocol *)0;
00188 }
00189 
00190 //_________________________________________________________________________________
00191 int XrdgetProtocolPort(const char * /*pname*/, char * /*parms*/, XrdProtocol_Config *pi)
00192 {
00193       // This function is called early on to determine the port we need to use. The
00194       // The default is ostensibly 1093 but can be overidden; which we allow.
00195 
00196       XrdProofdProtCfg pcfg(pi->ConfigFN, pi->eDest);
00197       // Init some relevant quantities for tracing
00198       XrdProofdTrace = new XrdOucTrace(pi->eDest);
00199       pcfg.Config(0);
00200 
00201       // Default XPD_DEF_PORT (1093)
00202       int port = XPD_DEF_PORT;
00203 
00204       if (pcfg.fPort > 0) {
00205          port = pcfg.fPort;
00206       } else {
00207          port = (pi && pi->Port > 0) ? pi->Port : XPD_DEF_PORT;
00208       }
00209 
00210       return port;
00211 }}
00212 
00213 //__________________________________________________________________________________
00214 XrdProofdProtocol::XrdProofdProtocol()
00215    : XrdProtocol("xproofd protocol handler"), fProtLink(this)
00216 {
00217    // Protocol constructor
00218    fLink = 0;
00219    fArgp = 0;
00220    fPClient = 0;
00221    fSecClient = 0;
00222    fAuthProt = 0;
00223    fResponses.reserve(10);
00224 
00225    // Instantiate a Proofd protocol object
00226    Reset();
00227 }
00228 
00229 //______________________________________________________________________________
00230 XrdProofdResponse *XrdProofdProtocol::Response(kXR_unt16 sid)
00231 {
00232    // Get response instance corresponding to stream ID 'sid'
00233    XPDLOC(ALL, "Protocol::Response")
00234 
00235    TRACE(HDBG, "sid: "<<sid<<", size: "<<fResponses.size());
00236 
00237    if (sid > 0)
00238       if (sid <= fResponses.size())
00239          return fResponses[sid-1];
00240 
00241    return (XrdProofdResponse *)0;
00242 }
00243 
00244 //______________________________________________________________________________
00245 XrdProofdResponse *XrdProofdProtocol::GetNewResponse(kXR_unt16 sid)
00246 {
00247    // Create new response instance for stream ID 'sid'
00248    XPDLOC(ALL, "Protocol::GetNewResponse")
00249 
00250    XrdOucString msg;
00251    XPDFORM(msg, "sid: %d", sid);
00252    if (sid > 0) {
00253       if (sid > fResponses.size()) {
00254          if (sid > fResponses.capacity()) {
00255             int newsz = (sid < 2 * fResponses.capacity()) ? 2 * fResponses.capacity() : sid+1 ;
00256             fResponses.reserve(newsz);
00257             if (TRACING(DBG)) {
00258                msg += " new capacity: ";
00259                msg += (int) fResponses.capacity();
00260             }
00261         }
00262          int nnew = sid - fResponses.size();
00263          while (nnew--)
00264             fResponses.push_back(new XrdProofdResponse());
00265          if (TRACING(DBG)) {
00266             msg += "; new size: ";
00267             msg += (int) fResponses.size();
00268          }
00269       }
00270    } else {
00271       TRACE(XERR,"wrong sid: "<<sid);
00272       return (XrdProofdResponse *)0;
00273    }
00274 
00275    TRACE(DBG, msg);
00276 
00277    // Done
00278    return fResponses[sid-1];
00279 }
00280 
00281 //______________________________________________________________________________
00282 XrdProtocol *XrdProofdProtocol::Match(XrdLink *lp)
00283 {
00284    // Check whether the request matches this protocol
00285 
00286    struct ClientInitHandShake hsdata;
00287    char  *hsbuff = (char *)&hsdata;
00288 
00289    static hs_response_t hsresp = {0, 0, htonl(XPROOFD_VERSBIN), 0};
00290 
00291    XrdProofdProtocol *xp;
00292    int dlen;
00293 
00294    // Peek at the first 20 bytes of data
00295    if ((dlen = lp->Peek(hsbuff,sizeof(hsdata),fgReadWait)) != sizeof(hsdata)) {
00296       if (dlen <= 0) lp->setEtext("Match: handshake not received");
00297       return (XrdProtocol *)0;
00298    }
00299 
00300    // Verify that this is our protocol
00301    hsdata.third  = ntohl(hsdata.third);
00302    if (dlen != sizeof(hsdata) ||  hsdata.first || hsdata.second
00303        || !(hsdata.third == 1) || hsdata.fourth || hsdata.fifth) return 0;
00304 
00305    // Respond to this request with the handshake response
00306    if (!lp->Send((char *)&hsresp, sizeof(hsresp))) {
00307       lp->setEtext("Match: handshake failed");
00308       return (XrdProtocol *)0;
00309    }
00310 
00311    // We can now read all 20 bytes and discard them (no need to wait for it)
00312    int len = sizeof(hsdata);
00313    if (lp->Recv(hsbuff, len) != len) {
00314       lp->setEtext("Match: reread failed");
00315       return (XrdProtocol *)0;
00316    }
00317 
00318    // Get a protocol object off the stack (if none, allocate a new one)
00319    if (!(xp = fgProtStack.Pop()))
00320       xp = new XrdProofdProtocol();
00321 
00322    // Bind the protocol to the link and return the protocol
00323    xp->fLink = lp;
00324    strcpy(xp->fSecEntity.prot, "host");
00325    xp->fSecEntity.host = strdup((char *)lp->Host());
00326 
00327    // Dummy data used by 'proofd'
00328    kXR_int32 dum[2];
00329    if (xp->GetData("dummy",(char *)&dum[0],sizeof(dum)) != 0) {
00330       xp->Recycle(0,0,0);
00331       return (XrdProtocol *)0;
00332    }
00333 
00334    // We are done
00335    return (XrdProtocol *)xp;
00336 }
00337 
00338 //_____________________________________________________________________________
00339 int XrdProofdProtocol::Stats(char *buff, int blen, int)
00340 {
00341    // Return statistics info about the protocol.
00342    // Not really implemented yet: this is a reduced XrdXrootd version.
00343 
00344    static char statfmt[] = "<stats id=\"xproofd\"><num>%ld</num></stats>";
00345 
00346    // If caller wants only size, give it to him
00347    if (!buff)
00348       return sizeof(statfmt)+16;
00349 
00350    // We have only one statistic -- number of successful matches
00351    return snprintf(buff, blen, statfmt, fgCount);
00352 }
00353 
00354 //______________________________________________________________________________
00355 void XrdProofdProtocol::Reset()
00356 {
00357    // Reset static and local vars
00358 
00359    // Init local vars
00360    fLink      = 0;
00361    fPid       = -1;
00362    fArgp      = 0;
00363    fStatus    = 0;
00364    fClntCapVer = 0;
00365    fConnType  = kXPD_ClientMaster;
00366    fSuperUser = 0;
00367    fPClient   = 0;
00368    fCID       = -1;
00369    fTraceID   = "";
00370    fAdminPath = "";
00371    if (fAuthProt) {
00372       fAuthProt->Delete();
00373       fAuthProt = 0;
00374    }
00375    memset(&fSecEntity, 0, sizeof(fSecEntity));
00376    // Cleanup existing XrdProofdResponse objects
00377    std::vector<XrdProofdResponse *>::iterator ii = fResponses.begin(); // One per each logical connection
00378    while (ii != fResponses.end()) {
00379       delete *ii;
00380       ii++;
00381    }
00382    fResponses.clear();
00383 }
00384 
00385 //______________________________________________________________________________
00386 int XrdProofdProtocol::Configure(char *, XrdProtocol_Config *pi)
00387 {
00388    // Protocol configuration tool
00389    // Function: Establish configuration at load time.
00390    // Output: 1 upon success or 0 otherwise.
00391    XPDLOC(ALL, "Protocol::Configure")
00392 
00393    XrdOucString mp;
00394 
00395    // Only once
00396    if (fgConfigDone)
00397       return 1;
00398    fgConfigDone = 1;
00399 
00400    // Copy out the special info we want to use at top level
00401    fgLogger = pi->eDest->logger();
00402    fgEDest.logger(fgLogger);
00403    if (XrdProofdTrace) delete XrdProofdTrace; // It could have been initialized in XrdgetProtocolPort
00404    XrdProofdTrace = new XrdOucTrace(&fgEDest);
00405    fgBPool        = pi->BPool;
00406    fgReadWait     = pi->readWait;
00407 
00408    // Pre-initialize some i/o values
00409    fgMaxBuffsz = fgBPool->MaxSize();
00410 
00411    // Schedule protocol object cleanup; the maximum number of objects
00412    // and the max age are taken from XrdXrootdProtocol: this may need
00413    // some optimization in the future.
00414    fgProtStack.Set(pi->Sched, XrdProofdTrace, TRACE_MEM);
00415    fgProtStack.Set((pi->ConnMax/3 ? pi->ConnMax/3 : 30), 60*60);
00416 
00417    // Default tracing options: always trace logins and errors for all
00418    // domains; if the '-d' option was specified on the command line then
00419    // trace also REQ and FORM.
00420    // NB: these are superseeded by settings in the config file (xpd.trace)
00421    XrdProofdTrace->What = TRACE_DOMAINS;
00422    TRACESET(XERR, 1);
00423    TRACESET(LOGIN, 1);
00424    TRACESET(RSP, 0);
00425    if (pi->DebugON)
00426       XrdProofdTrace->What |= (TRACE_REQ | TRACE_FORK);
00427 
00428    // Work as root to avoid contineous changes of the effective user
00429    // (users are logged in their box after forking)
00430    fgEUidAtStartup = geteuid();
00431    if (!getuid()) XrdSysPriv::ChangePerm((uid_t)0, (gid_t)0);
00432 
00433    // Process the config file for directives meaningful to us
00434    // Create and Configure the manager
00435    fgMgr = new XrdProofdManager(pi, &fgEDest);
00436    if (fgMgr->Config(0)) return 0;
00437    mp = "global manager created";
00438    TRACE(ALL, mp);
00439 
00440    // Issue herald indicating we configured successfully
00441    TRACE(ALL, "xproofd protocol version "<<XPROOFD_VERSION<<
00442               " build "<<XrdVERSION<<" successfully loaded");
00443 
00444    // Return success
00445    return 1;
00446 }
00447 
00448 //______________________________________________________________________________
00449 int XrdProofdProtocol::Process(XrdLink *)
00450 {
00451    // Process the information received on the active link.
00452    // (We ignore the argument here)
00453    XPDLOC(ALL, "Protocol::Process")
00454 
00455    int rc = 0;
00456    TRACEP(this, DBG, "instance: " << this);
00457 
00458    // Read the next request header
00459    if ((rc = GetData("request", (char *)&fRequest, sizeof(fRequest))) != 0)
00460       return rc;
00461    TRACEP(this, HDBG, "after GetData: rc: " << rc);
00462 
00463    // Deserialize the data
00464    fRequest.header.requestid = ntohs(fRequest.header.requestid);
00465    fRequest.header.dlen      = ntohl(fRequest.header.dlen);
00466 
00467    // Get response object
00468    kXR_unt16 sid;
00469    memcpy((void *)&sid, (const void *)&(fRequest.header.streamid[0]), 2);
00470    XrdProofdResponse *response = 0;
00471    if (!(response = Response(sid))) {
00472       if (!(response = GetNewResponse(sid))) {
00473          TRACEP(this, XERR, "could not get Response instance for rid: "<< sid);
00474          return rc;
00475       }
00476    }
00477    // Set the stream ID for the reply
00478    response->Set(fRequest.header.streamid);
00479    response->Set(fLink);
00480 
00481    TRACEP(this, REQ, "sid: " << sid << ", req id: " << fRequest.header.requestid <<
00482                 " (" << XrdProofdAux::ProofRequestTypes(fRequest.header.requestid)<<
00483                 ")" << ", dlen: " <<fRequest.header.dlen);
00484 
00485    // Every request has an associated data length. It better be >= 0 or we won't
00486    // be able to know how much data to read.
00487    if (fRequest.header.dlen < 0) {
00488       response->Send(kXR_ArgInvalid, "Process: Invalid request data length");
00489       return fLink->setEtext("Process: protocol data length error");
00490    }
00491 
00492    // Read any argument data at this point, except when the request is to forward
00493    // a buffer: the argument may have to be segmented and we're not prepared to do
00494    // that here.
00495    if (fRequest.header.requestid != kXP_sendmsg && fRequest.header.dlen) {
00496       if ((fArgp = GetBuff(fRequest.header.dlen+1, fArgp)) == 0) {
00497          response->Send(kXR_ArgTooLong, "fRequest.argument is too long");
00498          return rc;
00499       }
00500       if ((rc = GetData("arg", fArgp->buff, fRequest.header.dlen)))
00501          return rc;
00502       fArgp->buff[fRequest.header.dlen] = '\0';
00503    }
00504 
00505    // Continue with request processing at the resume point
00506    return Process2();
00507 }
00508 
00509 //______________________________________________________________________________
00510 int XrdProofdProtocol::Process2()
00511 {
00512    // Local processing method: here the request is dispatched to the appropriate
00513    // method
00514    XPDLOC(ALL, "Protocol::Process2")
00515 
00516    int rc = 0;
00517    XPD_SETRESP(this, "Process2");
00518 
00519    TRACEP(this, REQ, "req id: " << fRequest.header.requestid << " (" <<
00520                 XrdProofdAux::ProofRequestTypes(fRequest.header.requestid) << ")");
00521 
00522    ResetCtrlcGuard_t ctrlcguard(this, fRequest.header.requestid);
00523 
00524    // If the user is logged in check if the wanted action is to be done by us
00525    if (fStatus && (fStatus & XPD_LOGGEDIN)) {
00526       // Record time of the last action
00527       TouchAdminPath();
00528       // We must have a client instance if here
00529       if (!fPClient) {
00530          TRACEP(this, XERR, "client undefined!!! ");
00531          response->Send(kXR_InvalidRequest,"client undefined!!! ");
00532          return 0;
00533       }
00534       bool formgr = 0;
00535       switch(fRequest.header.requestid) {
00536          case kXP_ctrlc:
00537             rc = CtrlC();
00538             break;
00539          case kXP_touch:
00540             // Reset the asked-to-touch flag, if it was never set
00541             fPClient->Touch(1);
00542             break;
00543          case kXP_interrupt:
00544             rc = Interrupt();
00545             break;
00546          case kXP_ping:
00547             rc = Ping();
00548             break;
00549          case kXP_sendmsg:
00550             rc = SendMsg();
00551             break;
00552          case kXP_urgent:
00553             rc = Urgent();
00554             break;
00555          default:
00556             formgr = 1;
00557       }
00558       if (!formgr) {
00559          // Check the link
00560          if (!fLink || (fLink->FDnum() <= 0)) {
00561             TRACE(XERR, "link is undefined! ");
00562             return -1;
00563          }
00564          return rc;
00565       }
00566    }
00567 
00568    // The request is for the manager
00569    rc = fgMgr->Process(this);
00570    // Check the link
00571    if (!fLink || (fLink->FDnum() <= 0)) {
00572       TRACE(XERR, "link is undefined! ");
00573       return -1;
00574    }
00575    return rc;
00576 }
00577 
00578 //______________________________________________________________________
00579 void XrdProofdProtocol::Recycle(XrdLink *, int, const char *)
00580 {
00581    // Recycle call. Release the instance and give it back to the stack.
00582    XPDLOC(ALL, "Protocol::Recycle")
00583 
00584    const char *srvtype[6] = {"ANY", "MasterWorker", "MasterMaster",
00585                              "ClientMaster", "Internal", "Admin"};
00586    XrdOucString buf;
00587 
00588    // Document the disconnect
00589    if (fPClient)
00590       XPDFORM(buf, "user %s disconnected; type: %s", fPClient->User(),
00591                    srvtype[fConnType+1]);
00592    else
00593       XPDFORM(buf, "user disconnected; type: %s", srvtype[fConnType+1]);
00594    TRACEP(this, LOGIN, buf);
00595 
00596    // If we have a buffer, release it
00597    if (fArgp) {
00598       fgBPool->Release(fArgp);
00599       fArgp = 0;
00600    }
00601 
00602    // Locate the client instance
00603    XrdProofdClient *pmgr = fPClient;
00604 
00605    if (pmgr) {
00606 
00607 
00608       if (!Internal()) {
00609 
00610          // Signal the client manager that a client has just gone
00611          if (fgMgr && fgMgr->ClientMgr()) {
00612             TRACE(HDBG, "fAdminPath: "<<fAdminPath);
00613             XPDFORM(buf, "%s %p %d %d", fAdminPath.c_str(), pmgr, fCID, fPid);
00614             TRACE(DBG, "sending to ClientMgr: "<<buf);
00615             fgMgr->ClientMgr()->Pipe()->Post(XrdProofdClientMgr::kClientDisconnect, buf.c_str());
00616          }
00617 
00618       } else {
00619 
00620          // Internal connection: we need to remove this instance from the list
00621          // of proxy servers and to notify the attached clients.
00622          // Tell the session manager that this session has gone
00623          if (fgMgr && fgMgr->SessionMgr()) {
00624             TRACE(HDBG, "fAdminPath: "<<fAdminPath);
00625             buf.assign(fAdminPath, fAdminPath.rfind('/') + 1, -1);
00626             TRACE(DBG, "sending to ProofServMgr: "<<buf);
00627             fgMgr->SessionMgr()->Pipe()->Post(XrdProofdProofServMgr::kSessionRemoval, buf.c_str());
00628          }
00629       }
00630    }
00631 
00632    // Set fields to starting point (debugging mostly)
00633    Reset();
00634 
00635    // Push ourselves on the stack
00636    fgProtStack.Push(&fProtLink);
00637 }
00638 
00639 //______________________________________________________________________________
00640 XrdBuffer *XrdProofdProtocol::GetBuff(int quantum, XrdBuffer *argp)
00641 {
00642    // Allocate a buffer to handle quantum bytes; if argp points to an existing
00643    // buffer, its size is checked and re-allocated if needed
00644    XPDLOC(ALL, "Protocol::GetBuff")
00645 
00646    TRACE(HDBG, "len: "<<quantum);
00647 
00648    // If we are given an existing buffer, we keep it if we use at least half
00649    // of it; otherwise we take a smaller one
00650    if (argp) {
00651       if (quantum >= argp->bsize / 2 && quantum <= argp->bsize)
00652          return argp;
00653    }
00654 
00655    // Release the buffer if too small
00656    XrdSysMutexHelper mh(fgBMutex);
00657    if (argp)
00658       fgBPool->Release(argp);
00659 
00660    // Obtain a new one
00661    if ((argp = fgBPool->Obtain(quantum)) == 0) {
00662       TRACE(XERR, "could not get requested buffer (size: "<<quantum<<
00663                   ") = insufficient memory");
00664    } else {
00665       TRACE(HDBG, "quantum: "<<quantum<<
00666                   ", buff: "<<(void *)(argp->buff)<<", bsize:"<<argp->bsize);
00667    }
00668 
00669    // Done
00670    return argp;
00671 }
00672 
00673 //______________________________________________________________________________
00674 void XrdProofdProtocol::ReleaseBuff(XrdBuffer *argp)
00675 {
00676    // Release a buffer previously allocated via GetBuff
00677 
00678    XrdSysMutexHelper mh(fgBMutex);
00679    fgBPool->Release(argp);
00680 }
00681 
00682 //______________________________________________________________________________
00683 int XrdProofdProtocol::GetData(const char *dtype, char *buff, int blen)
00684 {
00685    // Get data from the open link
00686    XPDLOC(ALL, "Protocol::GetData")
00687 
00688    int rlen;
00689 
00690    // Read the data but reschedule the link if we have not received all of the
00691    // data within the timeout interval.
00692    TRACEP(this, HDBG, "dtype: "<<(dtype ? dtype : " - ")<<", blen: "<<blen);
00693 
00694    // No need to lock:the link is disable while we are here
00695    rlen = fLink->Recv(buff, blen, fgReadWait);
00696    if (rlen  < 0) {
00697       if (rlen != -ENOMSG && rlen != -ECONNRESET) {
00698          XrdOucString emsg = "link read error: errno: ";
00699          emsg += -rlen;
00700          TRACEP(this, XERR, emsg.c_str());
00701          return (fLink ? fLink->setEtext(emsg.c_str()) : -1);
00702       } else {
00703          TRACEP(this, HDBG, "connection closed by peer (errno: "<<-rlen<<")");
00704          return -1;
00705       }
00706    }
00707    if (rlen < blen) {
00708       TRACEP(this, DBG, dtype << " timeout; read " <<rlen <<" of " <<blen <<" bytes - rescheduling");
00709       return 1;
00710    }
00711    TRACEP(this, HDBG, "rlen: "<<rlen);
00712 
00713    return 0;
00714 }
00715 
00716 //______________________________________________________________________________
00717 int XrdProofdProtocol::SendData(XrdProofdProofServ *xps,
00718                                 kXR_int32 sid, XrdSrvBuffer **buf, bool savebuf)
00719 {
00720    // Send data over the open link. Segmentation is done here, if required.
00721    XPDLOC(ALL, "Protocol::SendData")
00722 
00723    int rc = 0;
00724 
00725    TRACEP(this, HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
00726 
00727    // Buffer length
00728    int len = fRequest.header.dlen;
00729 
00730    // Quantum size
00731    int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
00732 
00733    // Get a buffer
00734    XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
00735    if (!argp) return -1;
00736 
00737    // Now send over all of the data as unsolicited messages
00738    XrdOucString msg;
00739    while (len > 0) {
00740 
00741       XrdProofdResponse *response = (sid > -1) ? xps->Response() : 0;
00742 
00743       if ((rc = GetData("data", argp->buff, quantum))) {
00744          { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
00745          return -1;
00746       }
00747       if (buf && !(*buf) && savebuf)
00748          *buf = new XrdSrvBuffer(argp->buff, quantum, 1);
00749       // Send
00750       if (sid > -1) {
00751          if (TRACING(HDBG))
00752             XPDFORM(msg, "EXT: server ID: %d, sending: %d bytes", sid, quantum);
00753          if (!response || response->Send(kXR_attn, kXPD_msgsid, sid,
00754                                          argp->buff, quantum) != 0) {
00755             { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
00756             XPDFORM(msg, "EXT: server ID: %d, problems sending: %d bytes to server",
00757                          sid, quantum);
00758             TRACEP(this, XERR, msg);
00759             return -1;
00760          }
00761       } else {
00762 
00763          // Get ID of the client
00764          int cid = ntohl(fRequest.sendrcv.cid);
00765          if (TRACING(HDBG))
00766             XPDFORM(msg, "INT: client ID: %d, sending: %d bytes", cid, quantum);
00767          if (xps->SendData(cid, argp->buff, quantum) != 0) {
00768             { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
00769             XPDFORM(msg, "INT: client ID: %d, problems sending: %d bytes to client",
00770                          cid, quantum);
00771             TRACEP(this, XERR, msg);
00772             return -1;
00773          }
00774       }
00775       TRACEP(this, HDBG, msg);
00776       // Next segment
00777       len -= quantum;
00778       if (len < quantum)
00779          quantum = len;
00780    }
00781 
00782    // Release the buffer
00783    { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
00784 
00785    // Done
00786    return 0;
00787 }
00788 
00789 //______________________________________________________________________________
00790 int XrdProofdProtocol::SendDataN(XrdProofdProofServ *xps,
00791                                  XrdSrvBuffer **buf, bool savebuf)
00792 {
00793    // Send data over the open client links of session 'xps'.
00794    // Used when all the connected clients are eligible to receive the message.
00795    // Segmentation is done here, if required.
00796    XPDLOC(ALL, "Protocol::SendDataN")
00797 
00798    int rc = 0;
00799 
00800    TRACEP(this, HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
00801 
00802    // Buffer length
00803    int len = fRequest.header.dlen;
00804 
00805    // Quantum size
00806    int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
00807 
00808    // Get a buffer
00809    XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
00810    if (!argp) return -1;
00811 
00812    // Now send over all of the data as unsolicited messages
00813    while (len > 0) {
00814       if ((rc = GetData("data", argp->buff, quantum))) {
00815          XrdProofdProtocol::ReleaseBuff(argp);
00816          return -1;
00817       }
00818       if (buf && !(*buf) && savebuf)
00819          *buf = new XrdSrvBuffer(argp->buff, quantum, 1);
00820 
00821       // Send to connected clients
00822       if (xps->SendDataN(argp->buff, quantum) != 0) {
00823          XrdProofdProtocol::ReleaseBuff(argp);
00824          return -1;
00825       }
00826 
00827       // Next segment
00828       len -= quantum;
00829       if (len < quantum)
00830          quantum = len;
00831    }
00832 
00833    // Release the buffer
00834    XrdProofdProtocol::ReleaseBuff(argp);
00835 
00836    // Done
00837    return 0;
00838 }
00839 
00840 //_____________________________________________________________________________
00841 int XrdProofdProtocol::SendMsg()
00842 {
00843    // Handle a request to forward a message to another process
00844    XPDLOC(ALL, "Protocol::SendMsg")
00845 
00846    static const char *crecv[5] = {"master proofserv", "top master",
00847                                   "client", "undefined", "any"};
00848    int rc = 0;
00849 
00850    XPD_SETRESP(this, "SendMsg");
00851 
00852    // Unmarshall the data
00853    int psid = ntohl(fRequest.sendrcv.sid);
00854    int opt = ntohl(fRequest.sendrcv.opt);
00855 
00856    XrdOucString msg;
00857    // Find server session
00858    XrdProofdProofServ *xps = 0;
00859    if (!fPClient || !(xps = fPClient->GetServer(psid))) {
00860       XPDFORM(msg, "%s: session ID not found: %d", (Internal() ? "INT" : "EXT"), psid);
00861       TRACEP(this, XERR, msg.c_str());
00862       response->Send(kXR_InvalidRequest, msg.c_str());
00863       return 0;
00864    }
00865 
00866    // Message length
00867    int len = fRequest.header.dlen;
00868 
00869    if (!Internal()) {
00870 
00871       if (TRACING(HDBG)) {
00872          // Notify
00873          XPDFORM(msg, "EXT: sending %d bytes to proofserv (psid: %d, xps: %p, status: %d,"
00874                      " cid: %d)", len, psid, xps, xps->Status(), fCID);
00875          TRACEP(this, HDBG, msg.c_str());
00876       }
00877 
00878       // Send to proofsrv our client ID
00879       if (fCID == -1) {
00880          TRACEP(this, REQ, "EXT: error getting clientSID");
00881          response->Send(kXP_ServerError,"EXT: getting clientSID");
00882          return 0;
00883       }
00884       if (SendData(xps, fCID)) {
00885          TRACEP(this, REQ, "EXT: error sending message to proofserv");
00886          response->Send(kXP_reconnecting,"EXT: sending message to proofserv");
00887          return 0;
00888       }
00889 
00890       // Notify to user
00891       response->Send();
00892 
00893    } else {
00894 
00895       if (TRACING(HDBG)) {
00896           // Notify
00897           XPDFORM(msg, "INT: sending %d bytes to client/master (psid: %d, xps: %p, status: %d)",
00898                        len, psid, xps, xps->Status());
00899           TRACEP(this, HDBG, msg.c_str());
00900       }
00901       bool saveStartMsg = 0;
00902       XrdSrvBuffer *savedBuf = 0;
00903       // Additional info about the message
00904       if (opt & kXPD_setidle) {
00905          TRACEP(this, DBG, "INT: setting proofserv in 'idle' state");
00906          xps->SetStatus(kXPD_idle);
00907          PostSession(-1, fPClient->UI().fUser.c_str(),
00908                          fPClient->UI().fGroup.c_str(), xps);
00909       } else if (opt & kXPD_querynum) {
00910          TRACEP(this, DBG, "INT: got message with query number");
00911       } else if (opt & kXPD_startprocess) {
00912          TRACEP(this, DBG, "INT: setting proofserv in 'running' state");
00913          xps->SetStatus(kXPD_running);
00914          PostSession(1, fPClient->UI().fUser.c_str(),
00915                         fPClient->UI().fGroup.c_str(), xps);
00916          // Save start processing message for later clients
00917          xps->DeleteStartMsg();
00918          saveStartMsg = 1;
00919       } else if (opt & kXPD_logmsg) {
00920          // We broadcast log messages only not idle to catch the
00921          // result from processing
00922          if (xps->Status() == kXPD_running) {
00923             TRACEP(this, DBG, "INT: broadcasting log message");
00924             opt |= kXPD_fb_prog;
00925          }
00926       }
00927       bool fbprog = (opt & kXPD_fb_prog);
00928 
00929       if (!fbprog) {
00930          //
00931          // The message is strictly for the client requiring it
00932          if (SendData(xps, -1, &savedBuf, saveStartMsg) != 0) {
00933             response->Send(kXP_reconnecting,
00934                            "SendMsg: INT: session is reconnecting: retry later");
00935             return 0;
00936          }
00937       } else {
00938          // Send to all connected clients
00939          if (SendDataN(xps, &savedBuf, saveStartMsg) != 0) {
00940             response->Send(kXP_reconnecting,
00941                            "SendMsg: INT: session is reconnecting: retry later");
00942             return 0;
00943          }
00944       }
00945       // Save start processing messages, if required
00946       if (saveStartMsg)
00947          xps->SetStartMsg(savedBuf);
00948 
00949       if (TRACING(DBG)) {
00950          int ii = xps->SrvType();
00951          if (ii > 3) ii = 3;
00952          if (ii < 0) ii = 4;
00953          XPDFORM(msg, "INT: message sent to %s (%d bytes)", crecv[ii], len);
00954          TRACEP(this, DBG, msg);
00955       }
00956       // Notify to proofsrv
00957       response->Send();
00958    }
00959 
00960    // Over
00961    return 0;
00962 }
00963 
00964 //______________________________________________________________________________
00965 int XrdProofdProtocol::Urgent()
00966 {
00967    // Handle generic request of a urgent message to be forwarded to the server
00968    XPDLOC(ALL, "Protocol::Urgent")
00969 
00970    unsigned int rc = 0;
00971 
00972    XPD_SETRESP(this, "Urgent");
00973 
00974    // Unmarshall the data
00975    int psid = ntohl(fRequest.proof.sid);
00976    int type = ntohl(fRequest.proof.int1);
00977    int int1 = ntohl(fRequest.proof.int2);
00978    int int2 = ntohl(fRequest.proof.int3);
00979 
00980    TRACEP(this, REQ, "psid: "<<psid<<", type: "<< type);
00981 
00982    // Find server session
00983    XrdProofdProofServ *xps = 0;
00984    if (!fPClient || !(xps = fPClient->GetServer(psid))) {
00985       TRACEP(this, XERR, "session ID not found: "<<psid);
00986       response->Send(kXR_InvalidRequest,"Urgent: session ID not found");
00987       return 0;
00988    }
00989 
00990    TRACEP(this, DBG, "xps: "<<xps<<", status: "<<xps->Status());
00991 
00992    // Check ID matching
00993    if (!xps->Match(psid)) {
00994       response->Send(kXP_InvalidRequest,"Urgent: IDs do not match - do nothing");
00995       return 0;
00996    }
00997 
00998    // Check the link to the session
00999    if (!xps->Response()) {
01000       response->Send(kXP_InvalidRequest,"Urgent: session response object undefined - do nothing");
01001       return 0;
01002    }
01003 
01004    // Prepare buffer
01005    int len = 3 *sizeof(kXR_int32);
01006    char *buf = new char[len];
01007    // Type
01008    kXR_int32 itmp = static_cast<kXR_int32>(htonl(type));
01009    memcpy(buf, &itmp, sizeof(kXR_int32));
01010    // First info container
01011    itmp = static_cast<kXR_int32>(htonl(int1));
01012    memcpy(buf + sizeof(kXR_int32), &itmp, sizeof(kXR_int32));
01013    // Second info container
01014    itmp = static_cast<kXR_int32>(htonl(int2));
01015    memcpy(buf + 2 * sizeof(kXR_int32), &itmp, sizeof(kXR_int32));
01016    // Send over
01017    if (xps->Response()->Send(kXR_attn, kXPD_urgent, buf, len) != 0) {
01018       response->Send(kXP_ServerError,
01019                      "Urgent: could not propagate request to proofsrv");
01020       return 0;
01021    }
01022 
01023    // Notify to user
01024    response->Send();
01025    TRACEP(this, DBG, "request propagated to proofsrv");
01026 
01027    // Over
01028    return 0;
01029 }
01030 
01031 //___________________________________________________________________________
01032 int XrdProofdProtocol::Interrupt()
01033 {
01034    // Handle an interrupt request
01035    XPDLOC(ALL, "Protocol::Interrupt")
01036 
01037    int rc = 0;
01038 
01039    XPD_SETRESP(this, "Interrupt");
01040 
01041    // Unmarshall the data
01042    int psid = ntohl(fRequest.interrupt.sid);
01043    int type = ntohl(fRequest.interrupt.type);
01044    TRACEP(this, REQ, "psid: "<<psid<<", type:"<<type);
01045 
01046    // Find server session
01047    XrdProofdProofServ *xps = 0;
01048    if (!fPClient || !(xps = fPClient->GetServer(psid))) {
01049       TRACEP(this, XERR, "session ID not found: "<<psid);
01050       response->Send(kXR_InvalidRequest,"Interrupt: session ID not found");
01051       return 0;
01052    }
01053 
01054    if (xps) {
01055 
01056       // Check ID matching
01057       if (!xps->Match(psid)) {
01058          response->Send(kXP_InvalidRequest,"Interrupt: IDs do not match - do nothing");
01059          return 0;
01060       }
01061 
01062       XrdOucString msg;
01063       XPDFORM(msg, "xps: %p, link ID: %s, proofsrv PID: %d",
01064                    xps, xps->Response()->TraceID(), xps->SrvPID());
01065       TRACEP(this, DBG, msg.c_str());
01066 
01067       // Propagate the type as unsolicited
01068       if (xps->Response()->Send(kXR_attn, kXPD_interrupt, type) != 0) {
01069          response->Send(kXP_ServerError,
01070                         "Interrupt: could not propagate interrupt code to proofsrv");
01071          return 0;
01072       }
01073 
01074       // Notify to user
01075       response->Send();
01076       TRACEP(this, DBG, "interrupt propagated to proofsrv");
01077    }
01078 
01079    // Over
01080    return 0;
01081 }
01082 
01083 //___________________________________________________________________________
01084 int XrdProofdProtocol::Ping()
01085 {
01086    // Handle a ping request.
01087    // For internal connections, ping is done asynchronously to avoid locking
01088    // problems; the session checker verifies that the admin file has been touched
01089    // recently enough; touching is done in Process2, so we have nothing to do here
01090    XPDLOC(ALL, "Protocol::Ping")
01091 
01092    int rc = 0;
01093    if (Internal()) {
01094       if (TRACING(HDBG)) {
01095          XPD_SETRESP(this, "Ping");
01096          TRACEP(this,  HDBG, "INT: nothing to do ");
01097       }
01098       return 0;
01099    }
01100    XPD_SETRESP(this, "Ping");
01101 
01102    // Unmarshall the data
01103    int psid = ntohl(fRequest.sendrcv.sid);
01104    int asyncopt = ntohl(fRequest.sendrcv.opt);
01105 
01106    TRACEP(this, REQ, "psid: "<<psid<<", async: "<<asyncopt);
01107 
01108    // For connections to servers find the server session; manager connections
01109    // (psid == -1) do not have any session attached 
01110    XrdProofdProofServ *xps = 0;
01111    if (!fPClient || (psid > -1 && !(xps = fPClient->GetServer(psid)))) {
01112       TRACEP(this,  XERR, "session ID not found: "<<psid);
01113       response->Send(kXR_InvalidRequest,"session ID not found");
01114       return 0;
01115    }
01116 
01117    // For manager connections we are done
01118    kXR_int32 pingres = (psid > -1) ? 0 : 1;
01119    if (psid > -1 && xps && xps->IsValid()) {
01120 
01121       TRACEP(this,  DBG, "EXT: psid: "<<psid);
01122 
01123       // This is the max time we will privide an answer
01124       kXR_int32 checkfq = fgMgr->SessionMgr()->CheckFrequency();
01125 
01126       // If asynchronous return the timeout for an answer
01127       if (asyncopt == 1) {
01128          TRACEP(this, DBG, "EXT: async: notifying timeout to client: "<<checkfq<<" secs");
01129          response->Send(kXR_ok, checkfq);
01130       }
01131 
01132       // Admin path
01133       XrdOucString path(xps->AdminPath());
01134       if (path.length() <= 0) {
01135          TRACEP(this,  XERR, "EXT: admin path is empty! - protocol error");
01136          if (asyncopt == 0)
01137             response->Send(kXP_ServerError, "EXT: admin path is empty! - protocol error");
01138          return 0;
01139       }
01140       path += ".status";
01141 
01142       // Current time
01143       int now = time(0);
01144 
01145       // Stat the admin file
01146       struct stat st0;
01147       if (stat(path.c_str(), &st0) != 0) {
01148          TRACEP(this,  XERR, "EXT: cannot stat admin path: "<<path);
01149          if (asyncopt == 0)
01150             response->Send(kXP_ServerError, "EXT: cannot stat admin path");
01151          return 0;
01152       }
01153 
01154       // Take the pid
01155       int pid = xps->SrvPID();
01156       // If the session is alive ...
01157       if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
01158          // If it as not touched during the last ~checkfq secs we ask for a refresh
01159          if ((now - st0.st_mtime) > checkfq - 5) {
01160             // Send the request (asking for further propagation)
01161             if (xps->VerifyProofServ(1) != 0) {
01162                TRACEP(this,  XERR, "EXT: could not send verify request to proofsrv");
01163                if (asyncopt == 0)
01164                   response->Send(kXP_ServerError, "EXT: could not verify reuqest to proofsrv");
01165                return 0;
01166             }
01167             // Wait for the action for checkfq secs, checking every 1 sec
01168             struct stat st1;
01169             int ns = checkfq;
01170             while (ns--) {
01171                if (stat(path.c_str(), &st1) == 0) {
01172                   if (st1.st_mtime > st0.st_mtime) {
01173                      pingres = 1;
01174                      break;
01175                   }
01176                }
01177                // Wait 1 sec
01178                TRACEP(this, DBG, "EXT: waiting "<<ns<<" secs for session "<<pid<<
01179                                  " to touch the admin path");
01180                sleep(1);
01181             }
01182 
01183          } else {
01184             // Session is alive
01185             pingres = 1;
01186          }
01187       } else {
01188          // Session is dead
01189          pingres = 0;
01190       }
01191 
01192       // Notify the client
01193       TRACEP(this, DBG, "EXT: notified the result to client: "<<pingres);
01194       if (asyncopt == 0) {
01195          response->Send(kXR_ok, pingres);
01196       } else {
01197          // Prepare buffer for asynchronous notification
01198          int len = sizeof(kXR_int32);
01199          char *buf = new char[len];
01200          // Option
01201          kXR_int32 ifw = (kXR_int32)0;
01202          ifw = static_cast<kXR_int32>(htonl(ifw));
01203          memcpy(buf, &ifw, sizeof(kXR_int32));
01204          response->Send(kXR_attn, kXPD_ping, buf, len);
01205       }
01206       return 0;
01207    } else if (psid > -1)  {
01208       // This is a failure for connections to sessions
01209       TRACEP(this, XERR, "session ID not found: "<<psid);
01210    }
01211 
01212    // Send the result
01213    response->Send(kXR_ok, pingres);
01214 
01215    // Done
01216    return 0;
01217 }
01218 
01219 //___________________________________________________________________________
01220 void XrdProofdProtocol::PostSession(int on, const char *u, const char *g,
01221                                     XrdProofdProofServ *xps)
01222 {
01223    // Post change of session status
01224    XPDLOC(ALL, "Protocol::PostSession")
01225 
01226    // Tell the priority manager
01227    if (fgMgr && fgMgr->PriorityMgr()) {
01228       int pid = (xps) ? xps->SrvPID() : -1;
01229       if (pid < 0) {
01230          TRACE(XERR, "undefined session or process id");
01231          return;
01232       }
01233       XrdOucString buf;
01234       XPDFORM(buf, "%d %s %s %d", on, u, g, pid);
01235 
01236       if (fgMgr->PriorityMgr()->Pipe()->Post(XrdProofdPriorityMgr::kChangeStatus,
01237                                              buf.c_str()) != 0) {
01238          TRACE(XERR, "problem posting the prority manager pipe");
01239       }
01240    }
01241    // Tell the scheduler
01242    if (fgMgr && fgMgr->ProofSched()) {
01243       if (on == -1 && xps && xps->SrvType() == kXPD_TopMaster) {
01244          TRACE(DBG, "posting the scheduler pipe");
01245          if (fgMgr->ProofSched()->Pipe()->Post(XrdProofSched::kReschedule, 0) != 0) {
01246             TRACE(XERR, "problem posting the scheduler pipe");
01247          }
01248       }
01249    }
01250    // Tell the session manager
01251    if (fgMgr && fgMgr->SessionMgr()) {
01252       if (fgMgr->SessionMgr()->Pipe()->Post(XrdProofdProofServMgr::kChgSessionSt, 0) != 0) {
01253          TRACE(XERR, "problem posting the session manager pipe");
01254       }
01255    }
01256    // Done
01257    return;
01258 }
01259 
01260 //___________________________________________________________________________
01261 void XrdProofdProtocol::TouchAdminPath()
01262 {
01263    // Recording time of the last request on this instance
01264    XPDLOC(ALL, "Protocol::TouchAdminPath")
01265 
01266    XPD_SETRESPV(this, "TouchAdminPath");
01267    TRACEP(this, HDBG, fAdminPath);
01268 
01269    if (fAdminPath.length() > 0) {
01270       int rc = 0;
01271       if ((rc = XrdProofdAux::Touch(fAdminPath.c_str())) != 0) {
01272          // In the case the file was not found and the connetion is internal
01273          // try also the terminated sessions, as the file could have been moved
01274          // in the meanwhile
01275          XrdOucString apath = fAdminPath;
01276          if (rc == -ENOENT && Internal()) {
01277             apath.replace("/activesessions/", "/terminatedsessions/");
01278             apath.replace(".status", "");
01279             rc = XrdProofdAux::Touch(apath.c_str());
01280          }
01281          if (rc != 0) {
01282             const char *type = Internal() ? "internal" : "external";
01283             TRACEP(this, XERR, type<<": problems touching "<<apath<<"; errno: "<<-rc);
01284          }
01285       }
01286    }
01287    // Done
01288    return;
01289 }
01290 
01291 //______________________________________________________________________________
01292 int XrdProofdProtocol::CtrlC()
01293 {
01294    // Set and propagate a Ctrl-C request
01295    XPDLOC(ALL, "Protocol::CtrlC")
01296 
01297    TRACEP(this, ALL, "handling request");
01298 
01299    { XrdSysMutexHelper mhp(fCtrlcMutex);
01300       fIsCtrlC = 1;
01301    }
01302 
01303    // Propagate now
01304    if (fgMgr) {
01305       if (fgMgr->SrvType() != kXPD_Worker) {
01306          if (fgMgr->NetMgr()) {
01307             fgMgr->NetMgr()->BroadcastCtrlC(Client()->User());
01308          }
01309       }
01310    }
01311 
01312    // Over
01313    return 0;
01314 }

Generated on Tue Jul 5 14:51:52 2011 for ROOT_528-00b_version by  doxygen 1.5.1