TXSocket.cxx

Go to the documentation of this file.
00001 // @(#)root/proofx:$Id: TXSocket.cxx 34428 2010-07-15 12:35:34Z ganis $
00002 // Author: Gerardo Ganis  12/12/2005
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TXSocket                                                             //
00015 //                                                                      //
00016 // High level handler of connections to xproofd.                        //
00017 //                                                                      //
00018 //////////////////////////////////////////////////////////////////////////
00019 
00020 #include "MessageTypes.h"
00021 #include "TEnv.h"
00022 #include "TError.h"
00023 #include "TException.h"
00024 #include "TMonitor.h"
00025 #include "TObjString.h"
00026 #include "TProof.h"
00027 #include "TSlave.h"
00028 #include "TRegexp.h"
00029 #include "TROOT.h"
00030 #include "TUrl.h"
00031 #include "TXHandler.h"
00032 #include "TXSocket.h"
00033 #include "XProofProtocol.h"
00034 
00035 #include "XrdProofConn.h"
00036 
00037 #include "XrdClient/XrdClientConnMgr.hh"
00038 #include "XrdClient/XrdClientConst.hh"
00039 #include "XrdClient/XrdClientEnv.hh"
00040 #include "XrdClient/XrdClientLogConnection.hh"
00041 #include "XrdClient/XrdClientMessage.hh"
00042 
00043 #ifndef WIN32
00044 #include <sys/socket.h>
00045 #else
00046 #include <Winsock2.h>
00047 #endif
00048 
00049 // ---- Tracing utils ----------------------------------------------------------
00050 #ifdef OLDXRDOUC
00051 #  include "XrdSysToOuc.h"
00052 #  include "XrdOuc/XrdOucError.hh"
00053 #  include "XrdOuc/XrdOucLogger.hh"
00054 #else
00055 #  include "XrdSys/XrdSysError.hh"
00056 #  include "XrdSys/XrdSysLogger.hh"
00057 #endif
00058 #include "XrdProofdTrace.h"
00059 XrdOucTrace *XrdProofdTrace = 0;
00060 static XrdSysLogger eLogger;
00061 static XrdSysError eDest(0, "Proofx");
00062 
00063 #ifdef WIN32
00064 ULong64_t TSocket::fgBytesSent;
00065 ULong64_t TSocket::fgBytesRecv;
00066 #endif
00067 
00068 //______________________________________________________________________________
00069 
00070 //---- error handling ----------------------------------------------------------
00071 
00072 //______________________________________________________________________________
00073 void TXSocket::DoError(int level, const char *location, const char *fmt, va_list va) const
00074 {
00075    // Interface to ErrorHandler (protected).
00076 
00077    ::ErrorHandler(level, Form("TXSocket::%s", location), fmt, va);
00078 }
00079 
00080 //----- Ping handler -----------------------------------------------------------
00081 //______________________________________________________________________________
00082 class TXSocketPingHandler : public TFileHandler {
00083    TXSocket  *fSocket;
00084 public:
00085    TXSocketPingHandler(TXSocket *s, Int_t fd)
00086       : TFileHandler(fd, 1) { fSocket = s; }
00087    Bool_t Notify();
00088    Bool_t ReadNotify() { return Notify(); }
00089 };
00090 
00091 //______________________________________________________________________________
00092 Bool_t TXSocketPingHandler::Notify()
00093 {
00094    // Ping the socket
00095    fSocket->Ping("ping handler");
00096 
00097    return kTRUE;
00098 }
00099 
00100 // Env variables init flag
00101 Bool_t TXSocket::fgInitDone = kFALSE;
00102 
00103 // Static variables for input notification
00104 TXSockPipe   TXSocket::fgPipe;               // Pipe for input monitoring
00105 TString      TXSocket::fgLoc = "undef";      // Location string
00106 
00107 // Static buffer manager
00108 TMutex       TXSocket::fgSMtx;               // To protect spare list
00109 std::list<TXSockBuf *> TXSocket::fgSQue;     // list of spare buffers
00110 Long64_t     TXSockBuf::fgBuffMem = 0;       // Total allocated memory
00111 Long64_t     TXSockBuf::fgMemMax = 10485760; // Max allowed allocated memory [10 MB]
00112 
00113 //_____________________________________________________________________________
00114 TXSocket::TXSocket(const char *url, Char_t m, Int_t psid, Char_t capver,
00115                    const char *logbuf, Int_t loglevel, TXHandler *handler)
00116          : TSocket(), fMode(m), fLogLevel(loglevel),
00117            fBuffer(logbuf), fASem(0), fAsynProc(1),
00118            fDontTimeout(kFALSE), fRDInterrupt(kFALSE), fXrdProofdVersion(-1)
00119 {
00120    // Constructor
00121    // Open the connection to a remote XrdProofd instance and start a PROOF
00122    // session.
00123    // The mode 'm' indicates the role of this connection:
00124    //     'a'      Administrator; used by an XPD to contact the head XPD
00125    //     'i'      Internal; used by a TXProofServ to call back its creator
00126    //              (see XrdProofUnixConn)
00127    //     'C'      PROOF manager: open connection only (do not start a session)
00128    //     'M'      Client creating a top master
00129    //     'A'      Client attaching to top master
00130    //     'm'      Top master creating a submaster
00131    //     's'      Master creating a slave
00132    // The buffer 'logbuf' is a null terminated string to be sent over at
00133    // login.
00134 
00135    fUrl = url;
00136    // Enable tracing in the XrdProof client. if not done already
00137    eDest.logger(&eLogger);
00138    if (!XrdProofdTrace)
00139       XrdProofdTrace = new XrdOucTrace(&eDest);
00140 
00141    // Init envs the first time
00142    if (!fgInitDone)
00143       InitEnvs();
00144 
00145    // Async queue related stuff
00146    if (!(fAMtx = new TMutex(kTRUE))) {
00147       Error("TXSocket", "problems initializing mutex for async queue");
00148       return;
00149    }
00150    fAQue.clear();
00151 
00152    // Interrupts queue related stuff
00153    if (!(fIMtx = new TMutex(kTRUE))) {
00154       Error("TXSocket", "problems initializing mutex for interrupts");
00155       return;
00156    }
00157    fILev = -1;
00158    fIForward = kFALSE;
00159 
00160    // Init some variables
00161    fByteLeft = 0;
00162    fByteCur = 0;
00163    fBufCur = 0;
00164    fServType = kPROOFD; // for consistency
00165    fTcpWindowSize = -1;
00166    fRemoteProtocol = -1;
00167    // By default forward directly to end-point
00168    fSendOpt = (fMode == 'i') ? (kXPD_internal | kXPD_async) : kXPD_async;
00169    fSessionID = (fMode == 'C') ? -1 : psid;
00170    fSocket = -1;
00171 
00172    // This is used by external code to create a link between this object
00173    // and another one
00174    fReference = 0;
00175 
00176    // The global pipe
00177    if (!fgPipe.IsValid()) {
00178       Error("TXSocket", "internal pipe is invalid");
00179       return;
00180    }
00181 
00182    // Some initial values
00183    TUrl u(url);
00184    fAddress = gSystem->GetHostByName(u.GetHost());
00185    u.SetProtocol("proof", kTRUE);
00186    fAddress.fPort = (u.GetPort() > 0) ? u.GetPort() : 1093;
00187 
00188    // Set the asynchronous handler
00189    fHandler = handler;
00190 
00191    if (url) {
00192 
00193       // Create connection (for managers the type of the connection is the same
00194       // as for top masters)
00195       char md = (fMode !='A' && fMode !='C') ? fMode : 'M';
00196       fConn = new XrdProofConn(url, md, psid, capver, this, fBuffer.Data());
00197       if (!fConn || !(fConn->IsValid())) {
00198          if (fConn->GetServType() != XrdProofConn::kSTProofd)
00199             if (gDebug > 0)
00200                Error("TXSocket", "fatal error occurred while opening a connection"
00201                                  " to server [%s]: %s", url, fConn->GetLastErr());
00202          return;
00203       }
00204 
00205       // Create new proofserv if not client manager or administrator or internal mode
00206       if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A') {
00207          // We attach or create
00208          if (!Create()) {
00209             // Failure
00210             Error("TXSocket", "create or attach failed (%s)",
00211                   ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
00212             Close();
00213             return;
00214          }
00215       }
00216 
00217       // Fill some info
00218       fUser = fConn->fUser.c_str();
00219       fHost = fConn->fHost.c_str();
00220       fPort = fConn->fPort;
00221       if (fMode == 'C') {
00222          fXrdProofdVersion = fConn->fRemoteProtocol;
00223          fRemoteProtocol = fConn->fRemoteProtocol;
00224       }
00225 
00226       // Also in the base class
00227       fUrl = fConn->fUrl.GetUrl().c_str();
00228       fAddress = gSystem->GetHostByName(fConn->fUrl.Host.c_str());
00229       fAddress.fPort = fPort;
00230 
00231       // This is needed for the reader thread to signal an interrupt
00232       fPid = gSystem->GetPid();
00233    }
00234 }
00235 
00236 //______________________________________________________________________________
00237 TXSocket::TXSocket(const TXSocket &s) : TSocket(s),XrdClientAbsUnsolMsgHandler(s)
00238 {
00239    // TXSocket copy ctor.
00240 }
00241 
00242 //______________________________________________________________________________
00243 TXSocket& TXSocket::operator=(const TXSocket&)
00244 {
00245    // TXSocket assignment operator.
00246    return *this;
00247 }
00248 
00249 //_____________________________________________________________________________
00250 TXSocket::~TXSocket()
00251 {
00252    // Destructor
00253 
00254    // Disconnect from remote server (the connection manager is
00255    // responsible of the underlying physical connection, so we do not
00256    // force its closing)
00257    Close();
00258 
00259    // Delete mutexes
00260    SafeDelete(fAMtx);
00261    SafeDelete(fIMtx);
00262 }
00263 
00264 //______________________________________________________________________________
00265 void TXSocket::SetLocation(const char *loc)
00266 {
00267    // Set location string
00268 
00269    if (loc) {
00270       fgLoc = loc;
00271       fgPipe.SetLoc(loc);
00272    } else {
00273       fgLoc = "";
00274       fgPipe.SetLoc("");
00275    }
00276 }
00277 
00278 //_____________________________________________________________________________
00279 void TXSocket::SetSessionID(Int_t id)
00280 {
00281    // Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
00282 
00283    if (id < 0 && fConn)
00284       fConn->SetAsync(0);
00285    fSessionID = id;
00286 }
00287 
00288 //_____________________________________________________________________________
00289 void TXSocket::DisconnectSession(Int_t id, Option_t *opt)
00290 {
00291    // Disconnect a session. Use opt= "S" or "s" to
00292    // shutdown remote session.
00293    // Default is opt = "".
00294 
00295    // Make sure we are connected
00296    if (!IsValid()) {
00297       if (gDebug > 0)
00298          Info("DisconnectSession","not connected: nothing to do");
00299       return;
00300    }
00301 
00302    Bool_t shutdown = opt && (strchr(opt,'S') || strchr(opt,'s'));
00303    Bool_t all = opt && (strchr(opt,'A') || strchr(opt,'a'));
00304 
00305    if (id > -1 || all) {
00306       // Prepare request
00307       XPClientRequest Request;
00308       memset(&Request, 0, sizeof(Request) );
00309       fConn->SetSID(Request.header.streamid);
00310       if (shutdown)
00311          Request.proof.requestid = kXP_destroy;
00312       else
00313          Request.proof.requestid = kXP_detach;
00314       Request.proof.sid = id;
00315 
00316       // Send request
00317       XrdClientMessage *xrsp =
00318          fConn->SendReq(&Request, (const void *)0, 0, "DisconnectSession");
00319 
00320       // Print error msg, if any
00321       if (!xrsp && fConn->GetLastErr())
00322          Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
00323 
00324       // Cleanup
00325       SafeDelete(xrsp);
00326    }
00327 }
00328 
00329 //_____________________________________________________________________________
00330 void TXSocket::Close(Option_t *opt)
00331 {
00332    // Close connection. Available options are (case insensitive)
00333    //   'P'   force closing of the underlying physical connection
00334    //   'S'   shutdown remote session, is any
00335    // A session ID can be given using #...# signature, e.g. "#1#".
00336    // Default is opt = "".
00337 
00338    Int_t to = gEnv->GetValue("XProof.AsynProcSemTimeout", 60);
00339    if (fAsynProc.Wait(to*1000) != 0)
00340       Warning("Close", "could not hold semaphore for async messages after %d sec: closing anyhow (may give error messages)", to);
00341 
00342    // Remove any reference in the global pipe and ready-sock queue
00343    TXSocket::fgPipe.Flush(this);
00344 
00345    // Make sure we have a connection
00346    if (!fConn) {
00347       if (gDebug > 0)
00348          Info("Close","no connection: nothing to do");
00349       fAsynProc.Post();
00350       return;
00351    }
00352 
00353    // Disconnect the asynchronous requests handler
00354    fConn->SetAsync(0);
00355 
00356    // If we are connected we disconnect
00357    if (IsValid()) {
00358 
00359       // Parse options
00360       TString o(opt);
00361       Int_t sessID = fSessionID;
00362       if (o.Index("#") != kNPOS) {
00363          o.Remove(0,o.Index("#")+1);
00364          if (o.Index("#") != kNPOS) {
00365             o.Remove(o.Index("#"));
00366             sessID = o.IsDigit() ? o.Atoi() : sessID;
00367          }
00368       }
00369 
00370       if (sessID > -1) {
00371          // Warn the remote session, if any (after destroy the session is gone)
00372          DisconnectSession(sessID, opt);
00373       } else {
00374          // We are the manager: close underlying connection
00375          fConn->Close(opt);
00376       }
00377    }
00378 
00379    // Delete the connection module
00380    SafeDelete(fConn);
00381 
00382    // Post semaphore
00383    fAsynProc.Post();
00384 }
00385 
00386 //_____________________________________________________________________________
00387 UnsolRespProcResult TXSocket::ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *,
00388                                                     XrdClientMessage *m)
00389 {
00390    // We are here if an unsolicited response comes from a logical conn
00391    // The response comes in the form of an XrdClientMessage *, that must NOT be
00392    // destroyed after processing. It is destroyed by the first sender.
00393    // Remember that we are in a separate thread, since unsolicited
00394    // responses are asynchronous by nature.
00395    UnsolRespProcResult rc = kUNSOL_KEEP;
00396 
00397    // If we are closing we will not do anything
00398    TXSemaphoreGuard semg(&fAsynProc);
00399    if (!semg.IsValid()) {
00400       Error("ProcessUnsolicitedMsg", "%p: async semaphore taken by Close()! Should not be here!", this);
00401       return kUNSOL_CONTINUE;
00402    }
00403 
00404    if (!m) {
00405       if (gDebug > 2)
00406          Info("ProcessUnsolicitedMsg", "%p: got empty message: skipping", this);
00407       // Some one is perhaps interested in empty messages
00408       return kUNSOL_CONTINUE;
00409    } else {
00410       if (gDebug > 2)
00411          Info("ProcessUnsolicitedMsg", "%p: got message with status: %d, len: %d bytes (ID: %d)",
00412               this, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
00413    }
00414 
00415    // Error notification
00416    if (m->IsError()) {
00417       if (m->GetStatusCode() != XrdClientMessage::kXrdMSC_timeout) {
00418          if (gDebug > 0)
00419             Info("ProcessUnsolicitedMsg","%p: got error from underlying connection", this);
00420          XHandleErr_t herr = {1, 0};
00421          if (!fHandler || fHandler->HandleError((const void *)&herr)) {
00422             if (gDebug > 0)
00423                Info("ProcessUnsolicitedMsg","%p: handler undefined or recovery failed", this);
00424             // Avoid to contact the server any more
00425             fSessionID = -1;
00426          } else {
00427             // Connection still usable: update usage timestamp
00428             Touch();
00429          }
00430       } else {
00431          // Time out
00432          if (gDebug > 2)
00433             Info("ProcessUnsolicitedMsg", "%p: underlying connection timed out", this);
00434       }
00435       // Propagate the message to other possible handlers
00436       return kUNSOL_CONTINUE;
00437    }
00438 
00439    // From now on make sure is for us
00440    if (!fConn || !m->MatchStreamid(fConn->fStreamid)) {
00441       if (gDebug > 1)
00442          Info("ProcessUnsolicitedMsg", "%p: IDs do not match: {%d, %d}", this, fConn->fStreamid, m->HeaderSID());
00443       return kUNSOL_CONTINUE;
00444    }
00445 
00446    // Local processing ...
00447    if (!m) {
00448       Error("ProcessUnsolicitedMsg", "undefined message - disabling");
00449       PostMsg(kPROOF_STOP);
00450       return rc;
00451    }
00452 
00453    Int_t len = 0;
00454    if ((len = m->DataLen()) < (int)sizeof(kXR_int32)) {
00455       Error("ProcessUnsolicitedMsg", "empty or bad-formed message - disabling");
00456       PostMsg(kPROOF_STOP);
00457       return rc;
00458    }
00459 
00460    // Activity on the line: update usage timestamp
00461    Touch();
00462 
00463    // The first 4 bytes contain the action code
00464    kXR_int32 acod = 0;
00465    memcpy(&acod, m->GetData(), sizeof(kXR_int32));
00466    if (acod > 10000)
00467          Info("ProcessUnsolicitedMsg", "%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
00468               this, acod, acod, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
00469    //
00470    // Update pointer to data
00471    void *pdata = (void *)((char *)(m->GetData()) + sizeof(kXR_int32));
00472    len -= sizeof(kXR_int32);
00473    if (gDebug > 1)
00474       Info("ProcessUnsolicitedMsg", "%p: got action: %d (%d bytes) (ID: %d)",
00475            this, acod, len, m->HeaderSID());
00476 
00477    if (gDebug > 3)
00478       fgPipe.DumpReadySock();
00479 
00480    // Case by case
00481    kXR_int32 ilev = -1;
00482    const char *lab = 0;
00483 
00484    switch (acod) {
00485       case kXPD_ping:
00486          //
00487          // Special interrupt
00488          ilev = TProof::kPing;
00489          lab = "kXPD_ping";
00490       case kXPD_interrupt:
00491          //
00492          // Interrupt
00493          lab = !lab ? "kXPD_interrupt" : lab;
00494          { R__LOCKGUARD(fIMtx);
00495             if (acod == kXPD_interrupt) {
00496                memcpy(&ilev, pdata, sizeof(kXR_int32));
00497                ilev = net2host(ilev);
00498                // Update pointer to data
00499                pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00500                len -= sizeof(kXR_int32);
00501             }
00502             // The next 4 bytes contain the forwarding option
00503             kXR_int32 ifw = 0;
00504             if (len > 0) {
00505                memcpy(&ifw, pdata, sizeof(kXR_int32));
00506                ifw = net2host(ifw);
00507                if (gDebug > 1)
00508                   Info("ProcessUnsolicitedMsg","%s: forwarding option: %d", lab, ifw);
00509             }
00510             //
00511             // Save the interrupt
00512             fILev = ilev;
00513             fIForward = (ifw == 1) ? kTRUE : kFALSE;
00514 
00515             // Handle this input in this thread to avoid queuing on the
00516             // main thread
00517             XHandleIn_t hin = {acod, 0, 0, 0};
00518             if (fHandler)
00519                fHandler->HandleInput((const void *)&hin);
00520             else
00521                Error("ProcessUnsolicitedMsg","handler undefined");
00522          }
00523          break;
00524       case kXPD_timer:
00525          //
00526          // Set shutdown timer
00527          {
00528             kXR_int32 opt = 1;
00529             kXR_int32 delay = 0;
00530             // The next 4 bytes contain the shutdown option
00531             if (len > 0) {
00532                memcpy(&opt, pdata, sizeof(kXR_int32));
00533                opt = net2host(opt);
00534                if (gDebug > 1)
00535                   Info("ProcessUnsolicitedMsg","kXPD_timer: found opt: %d", opt);
00536                // Update pointer to data
00537                pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00538                len -= sizeof(kXR_int32);
00539             }
00540             // The next 4 bytes contain the delay
00541             if (len > 0) {
00542                memcpy(&delay, pdata, sizeof(kXR_int32));
00543                delay = net2host(delay);
00544                if (gDebug > 1)
00545                   Info("ProcessUnsolicitedMsg","kXPD_timer: found delay: %d", delay);
00546                // Update pointer to data
00547                pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00548                len -= sizeof(kXR_int32);
00549             }
00550 
00551             // Handle this input in this thread to avoid queuing on the
00552             // main thread
00553             XHandleIn_t hin = {acod, opt, delay, 0};
00554             if (fHandler)
00555                fHandler->HandleInput((const void *)&hin);
00556             else
00557                Error("ProcessUnsolicitedMsg","handler undefined");
00558          }
00559          break;
00560       case kXPD_inflate:
00561          //
00562          // Set inflate factor
00563          {
00564             kXR_int32 inflate = 1000;
00565             if (len > 0) {
00566                memcpy(&inflate, pdata, sizeof(kXR_int32));
00567                inflate = net2host(inflate);
00568                if (gDebug > 1)
00569                   Info("ProcessUnsolicitedMsg","kXPD_inflate: factor: %d", inflate);
00570                // Update pointer to data
00571                pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00572                len -= sizeof(kXR_int32);
00573             }
00574             // Handle this input in this thread to avoid queuing on the
00575             // main thread
00576             XHandleIn_t hin = {acod, inflate, 0, 0};
00577             if (fHandler)
00578                fHandler->HandleInput((const void *)&hin);
00579             else
00580                Error("ProcessUnsolicitedMsg","handler undefined");
00581          }
00582          break;
00583       case kXPD_priority:
00584          //
00585          // Broadcast group priority
00586          {
00587             kXR_int32 priority = -1;
00588             if (len > 0) {
00589                memcpy(&priority, pdata, sizeof(kXR_int32));
00590                priority = net2host(priority);
00591                if (gDebug > 1)
00592                   Info("ProcessUnsolicitedMsg","kXPD_priority: priority: %d", priority);
00593                // Update pointer to data
00594                pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00595                len -= sizeof(kXR_int32);
00596             }
00597             // Handle this input in this thread to avoid queuing on the
00598             // main thread
00599             XHandleIn_t hin = {acod, priority, 0, 0};
00600             if (fHandler)
00601                fHandler->HandleInput((const void *)&hin);
00602             else
00603                Error("ProcessUnsolicitedMsg","handler undefined");
00604          }
00605          break;
00606       case kXPD_flush:
00607          //
00608          // Flush request
00609          {
00610             // Handle this input in this thread to avoid queuing on the
00611             // main thread
00612             XHandleIn_t hin = {acod, 0, 0, 0};
00613             if (fHandler)
00614                fHandler->HandleInput((const void *)&hin);
00615             else
00616                Error("ProcessUnsolicitedMsg","handler undefined");
00617          }
00618          break;
00619       case kXPD_urgent:
00620          //
00621          // Set shutdown timer
00622          {
00623             // The next 4 bytes contain the urgent msg type
00624             kXR_int32 type = -1;
00625             if (len > 0) {
00626                memcpy(&type, pdata, sizeof(kXR_int32));
00627                type = net2host(type);
00628                if (gDebug > 1)
00629                   Info("ProcessUnsolicitedMsg","kXPD_urgent: found type: %d", type);
00630                // Update pointer to data
00631                pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00632                len -= sizeof(kXR_int32);
00633             }
00634             // The next 4 bytes contain the first info container
00635             kXR_int32 int1 = -1;
00636             if (len > 0) {
00637                memcpy(&int1, pdata, sizeof(kXR_int32));
00638                int1 = net2host(int1);
00639                if (gDebug > 1)
00640                   Info("ProcessUnsolicitedMsg","kXPD_urgent: found int1: %d", int1);
00641                // Update pointer to data
00642                pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00643                len -= sizeof(kXR_int32);
00644             }
00645             // The next 4 bytes contain the second info container
00646             kXR_int32 int2 = -1;
00647             if (len > 0) {
00648                memcpy(&int2, pdata, sizeof(kXR_int32));
00649                int2 = net2host(int2);
00650                if (gDebug > 1)
00651                   Info("ProcessUnsolicitedMsg","kXPD_urgent: found int2: %d", int2);
00652                // Update pointer to data
00653                pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00654                len -= sizeof(kXR_int32);
00655             }
00656 
00657             // Handle this input in this thread to avoid queuing on the
00658             // main thread
00659             XHandleIn_t hin = {acod, type, int1, int2};
00660             if (fHandler)
00661                fHandler->HandleInput((const void *)&hin);
00662             else
00663                Error("ProcessUnsolicitedMsg","handler undefined");
00664          }
00665          break;
00666       case kXPD_msg:
00667          //
00668          // Data message
00669          {  R__LOCKGUARD(fAMtx);
00670 
00671             // Get a spare buffer
00672             TXSockBuf *b = PopUpSpare(len);
00673             if (!b) {
00674                Error("ProcessUnsolicitedMsg","could allocate spare buffer");
00675                return rc;
00676             }
00677             memcpy(b->fBuf, pdata, len);
00678             b->fLen = len;
00679 
00680             // Update counters
00681             fBytesRecv += len;
00682 
00683             // Produce the message
00684             fAQue.push_back(b);
00685 
00686             // Post the global pipe
00687             fgPipe.Post(this);
00688 
00689             // Signal it and release the mutex
00690             if (gDebug > 2)
00691                Info("ProcessUnsolicitedMsg","%p: %s: posting semaphore: %p (%d bytes)",
00692                                             this, GetTitle(), &fASem, len);
00693             fASem.Post();
00694          }
00695 
00696          break;
00697       case kXPD_feedback:
00698          Info("ProcessUnsolicitedMsg",
00699               "kXPD_feedback treatment not yet implemented");
00700          break;
00701       case kXPD_srvmsg:
00702          //
00703          // Service message
00704          {
00705             // The next 4 bytes may contain a flag to control the way the message is displayed
00706             kXR_int32 opt = 0;
00707             memcpy(&opt, pdata, sizeof(kXR_int32));
00708             opt = net2host(opt);
00709             if (opt >= 0 && opt <= 4) {
00710                // Update pointer to data
00711                pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00712                len -= sizeof(kXR_int32);
00713             } else {
00714                opt = 1;
00715             }
00716 
00717             if (opt == 0) {
00718                // One line
00719                Printf("| %.*s", len, (char *)pdata);
00720             } else if (opt == 2) {
00721                // Raw displaying
00722                Printf("%.*s", len, (char *)pdata);
00723             } else if (opt == 3) {
00724                // Incremental displaying
00725                fprintf(stderr, "%.*s", len, (char *)pdata);
00726             } else if (opt == 4) {
00727                // Rewind
00728                fprintf(stderr, "%.*s\r", len, (char *)pdata);
00729             } else {
00730                // A small header
00731                Printf(" ");
00732                Printf("| Message from server:");
00733                Printf("| %.*s", len, (char *)pdata);
00734             }
00735          }
00736          break;
00737       case kXPD_errmsg:
00738          //
00739          // Error condition with message
00740          Printf(" ");
00741          Printf("| Error condition occured: message from server:");
00742          Printf("| %.*s", len, (char *)pdata);
00743          // Handle error
00744          if (fHandler)
00745             fHandler->HandleError();
00746          else
00747             Error("ProcessUnsolicitedMsg","handler undefined");
00748          break;
00749       case kXPD_msgsid:
00750          //
00751          // Data message
00752          { R__LOCKGUARD(fAMtx);
00753 
00754             // The next 4 bytes contain the sessiond id
00755             kXR_int32 cid = 0;
00756             memcpy(&cid, pdata, sizeof(kXR_int32));
00757             cid = net2host(cid);
00758 
00759             if (gDebug > 1)
00760                Info("ProcessUnsolicitedMsg","found cid: %d", cid);
00761 
00762             // Update pointer to data
00763             pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00764             len -= sizeof(kXR_int32);
00765 
00766             // Get a spare buffer
00767             TXSockBuf *b = PopUpSpare(len);
00768             if (!b) {
00769                Error("ProcessUnsolicitedMsg","could allocate spare buffer");
00770                return rc;
00771             }
00772             memcpy(b->fBuf, pdata, len);
00773             b->fLen = len;
00774 
00775             // Set the sid
00776             b->fCid = cid;
00777 
00778             // Update counters
00779             fBytesRecv += len;
00780 
00781             // Produce the message
00782             fAQue.push_back(b);
00783 
00784             // Post the global pipe
00785             fgPipe.Post(this);
00786 
00787             // Signal it and release the mutex
00788             if (gDebug > 2)
00789                Info("ProcessUnsolicitedMsg","%p: cid: %d, posting semaphore: %p (%d bytes)",
00790                     this, cid, &fASem, len);
00791             fASem.Post();
00792          }
00793 
00794          break;
00795       case kXPD_wrkmortem:
00796          //
00797          // A worker died
00798          {  TString what = TString::Format("%.*s", len, (char *)pdata);
00799             if (what.BeginsWith("idle-timeout")) {
00800                // Notify the idle timeout
00801                PostMsg(kPROOF_FATAL, kPROOF_WorkerIdleTO);
00802             } else {
00803                Printf(" ");
00804                Printf("| %s", what.Data());
00805                // Handle error
00806                if (fHandler)
00807                   fHandler->HandleError();
00808                else
00809                   Error("ProcessUnsolicitedMsg","handler undefined");
00810             }
00811          }
00812          break;
00813 
00814       case kXPD_touch:
00815          //
00816          // Request for remote touch: post a message to do that
00817          PostMsg(kPROOF_TOUCH);
00818          break;
00819       case kXPD_resume:
00820          //
00821          // process the next query (in the TXProofServ)
00822          PostMsg(kPROOF_STARTPROCESS);
00823          break;
00824       case kXPD_clusterinfo:
00825          //
00826          // Broadcast cluster information
00827          {
00828             kXR_int32 nsess = -1, nacti = -1, neffs = -1;
00829             if (len > 0) {
00830                // Total sessions
00831                memcpy(&nsess, pdata, sizeof(kXR_int32));
00832                nsess = net2host(nsess);
00833                pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00834                len -= sizeof(kXR_int32);
00835                // Active sessions
00836                memcpy(&nacti, pdata, sizeof(kXR_int32));
00837                nacti = net2host(nacti);
00838                pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00839                len -= sizeof(kXR_int32);
00840                // Effective sessions
00841                memcpy(&neffs, pdata, sizeof(kXR_int32));
00842                neffs = net2host(neffs);
00843                pdata = (void *)((char *)pdata + sizeof(kXR_int32));
00844                len -= sizeof(kXR_int32);
00845             }
00846             if (gDebug > 1)
00847                Info("ProcessUnsolicitedMsg","kXPD_clusterinfo: # sessions: %d,"
00848                     " # active: %d, # effective: %f", nsess, nacti, neffs/1000.);
00849             // Handle this input in this thread to avoid queuing on the
00850             // main thread
00851             XHandleIn_t hin = {acod, nsess, nacti, neffs};
00852             if (fHandler)
00853                fHandler->HandleInput((const void *)&hin);
00854             else
00855                Error("ProcessUnsolicitedMsg","handler undefined");
00856          }
00857          break;
00858      default:
00859          Error("ProcessUnsolicitedMsg","%p: unknown action code: %d received from '%s' - disabling",
00860                                        this, acod, GetTitle());
00861          PostMsg(kPROOF_STOP);
00862          break;
00863    }
00864 
00865    // We are done
00866    return rc;
00867 }
00868 
00869 //_______________________________________________________________________
00870 void TXSocket::PostMsg(Int_t type, const char *msg)
00871 {
00872    // Post a message of type 'type' into the read messages queue.
00873    // If 'msg' is defined it is also added as TString.
00874    // This is used, for example, with kPROOF_FATAL to force the main thread
00875    // to mark this socket as bad, avoiding race condition when a worker
00876    // dies while in processing state.
00877 
00878    // Create the message
00879    TMessage m(type);
00880 
00881    // Add the string if any
00882    if (msg && strlen(msg) > 0)
00883       m << TString(msg);
00884 
00885    // Write length in first word of buffer
00886    m.SetLength();
00887 
00888    // Get pointer to the message buffer
00889    char *mbuf = m.Buffer();
00890    Int_t mlen = m.Length();
00891    if (m.CompBuffer()) {
00892       mbuf = m.CompBuffer();
00893       mlen = m.CompLength();
00894    }
00895 
00896    //
00897    // Data message
00898    R__LOCKGUARD(fAMtx);
00899 
00900    // Get a spare buffer
00901    TXSockBuf *b = PopUpSpare(mlen);
00902    if (!b) {
00903       Error("PostMsg", "could allocate spare buffer");
00904       return;
00905    }
00906 
00907    // Fill the pipe buffer
00908    memcpy(b->fBuf, mbuf, mlen);
00909    b->fLen = mlen;
00910 
00911    // Update counters
00912    fBytesRecv += mlen;
00913 
00914    // Produce the message
00915    fAQue.push_back(b);
00916 
00917    // Post the global pipe
00918    fgPipe.Post(this);
00919 
00920    // Signal it and release the mutex
00921    if (gDebug > 0)
00922       Info("PostMsg", "%p: posting type %d to semaphore: %p (%d bytes)",
00923                           this, type, &fASem, mlen);
00924    fASem.Post();
00925 
00926    // Done
00927    return;
00928 }
00929 
00930 //____________________________________________________________________________
00931 Bool_t TXSocket::IsServProofd()
00932 {
00933    // Return kTRUE if the remote server is a 'proofd'
00934 
00935    if (fConn && (fConn->GetServType() == XrdProofConn::kSTProofd))
00936       return kTRUE;
00937 
00938    // Failure
00939    return kFALSE;
00940 }
00941 
00942 //_____________________________________________________________________________
00943 Int_t TXSocket::GetInterrupt(Bool_t &forward)
00944 {
00945    // Get latest interrupt level and reset it; if the interrupt has to be
00946    // propagated to lower stages forward will be kTRUE after the call
00947 
00948    if (gDebug > 2)
00949       Info("GetInterrupt","%p: waiting to lock mutex %p", this, fIMtx);
00950 
00951    R__LOCKGUARD(fIMtx);
00952 
00953    // Reset values
00954    Int_t ilev = -1;
00955    forward = kFALSE;
00956 
00957    // Check if filled
00958    if (fILev == -1)
00959       Error("GetInterrupt", "value is unset (%d) - protocol error",fILev);
00960 
00961    // Fill output
00962    ilev = fILev;
00963    forward = fIForward;
00964 
00965    // Reset values (we process it only once)
00966    fILev = -1;
00967    fIForward = kFALSE;
00968 
00969    // Return what we got
00970    return ilev;
00971 }
00972 
00973 //_____________________________________________________________________________
00974 Int_t TXSocket::Flush()
00975 {
00976    // Flush the asynchronous queue.
00977    // Typically called when a kHardInterrupt is received.
00978    // Returns number of bytes in flushed buffers.
00979 
00980    Int_t nf = 0;
00981    list<TXSockBuf *> splist;
00982    list<TXSockBuf *>::iterator i;
00983 
00984    {  R__LOCKGUARD(fAMtx);
00985 
00986       // Must have something to flush
00987       if (fAQue.size() > 0) {
00988 
00989          // Save size for later semaphore cleanup
00990          Int_t sz = fAQue.size();
00991          // get the highest interrupt level
00992          for (i = fAQue.begin(); i != fAQue.end();) {
00993             if (*i) {
00994                splist.push_back(*i);
00995                nf += (*i)->fLen;
00996                i = fAQue.erase(i);
00997             }
00998          }
00999 
01000          // Reset the asynchronous queue
01001          while (sz--)
01002             fASem.TryWait();
01003          fAQue.clear();
01004       }
01005    }
01006 
01007    // Move spares to the spare queue
01008    if (splist.size() > 0) {
01009       R__LOCKGUARD(&fgSMtx);
01010       for (i = splist.begin(); i != splist.end();) {
01011          fgSQue.push_back(*i);
01012          i = splist.erase(i);
01013       }
01014    }
01015 
01016    // We are done
01017    return nf;
01018 }
01019 
01020 //_____________________________________________________________________________
01021 Bool_t TXSocket::Create(Bool_t attach)
01022 {
01023    // This method sends a request for creation of (or attachment to) a remote
01024    // server application.
01025 
01026    // Make sure we are connected
01027    if (!IsValid()) {
01028       if (gDebug > 0)
01029          Info("Create","not connected: nothing to do");
01030       return kFALSE;
01031    }
01032 
01033    Int_t retriesleft = gEnv->GetValue("XProof.CreationRetries", 4);
01034 
01035    while (retriesleft--) {
01036 
01037       XPClientRequest reqhdr;
01038 
01039       // We fill the header struct containing the request for login
01040       memset( &reqhdr, 0, sizeof(reqhdr));
01041       fConn->SetSID(reqhdr.header.streamid);
01042 
01043       // This will be a kXP_attach or kXP_create request
01044       if (fMode == 'A' || attach) {
01045          reqhdr.header.requestid = kXP_attach;
01046          reqhdr.proof.sid = fSessionID;
01047       } else {
01048          reqhdr.header.requestid = kXP_create;
01049       }
01050 
01051       // Send log level
01052       reqhdr.proof.int1 = fLogLevel;
01053 
01054       // Send also the chosen alias
01055       const void *buf = (const void *)(fBuffer.Data());
01056       reqhdr.header.dlen = fBuffer.Length();
01057       if (gDebug >= 2)
01058          Info("Create", "sending %d bytes to server", reqhdr.header.dlen);
01059 
01060       // We call SendReq, the function devoted to sending commands.
01061       if (gDebug > 1)
01062          Info("Create", "creating session of server %s", fUrl.Data());
01063 
01064       // server response header
01065       char *answData = 0;
01066       XrdClientMessage *xrsp = fConn->SendReq(&reqhdr, buf,
01067                                               &answData, "TXSocket::Create", 0);
01068       struct ServerResponseBody_Protocol *srvresp = (struct ServerResponseBody_Protocol *)answData;
01069 
01070       // In any, the URL the data pool entry point will be stored here
01071       fBuffer = "";
01072       if (xrsp) {
01073 
01074          //
01075          // Pointer to data
01076          void *pdata = (void *)(xrsp->GetData());
01077          Int_t len = xrsp->DataLen();
01078 
01079          if (len >= (Int_t)sizeof(kXR_int32)) {
01080             // The first 4 bytes contain the session ID
01081             kXR_int32 psid = 0;
01082             memcpy(&psid, pdata, sizeof(kXR_int32));
01083             fSessionID = net2host(psid);
01084             pdata = (void *)((char *)pdata + sizeof(kXR_int32));
01085             len -= sizeof(kXR_int32);
01086          } else {
01087             Error("Create","session ID is undefined!");
01088          }
01089 
01090          if (len >= (Int_t)sizeof(kXR_int16)) {
01091             // The second 2 bytes contain the remote PROOF protocol version
01092             kXR_int16 dver = 0;
01093             memcpy(&dver, pdata, sizeof(kXR_int16));
01094             fRemoteProtocol = net2host(dver);
01095             pdata = (void *)((char *)pdata + sizeof(kXR_int16));
01096             len -= sizeof(kXR_int16);
01097          } else {
01098             Warning("Create","protocol version of the remote PROOF undefined!");
01099          }
01100 
01101          if (fRemoteProtocol == 0) {
01102             // We are dealing with an older server: the PROOF protocol is on 4 bytes
01103             len += sizeof(kXR_int16);
01104             kXR_int32 dver = 0;
01105             memcpy(&dver, pdata, sizeof(kXR_int32));
01106             fRemoteProtocol = net2host(dver);
01107             pdata = (void *)((char *)pdata + sizeof(kXR_int32));
01108             len -= sizeof(kXR_int32);
01109          } else {
01110             if (len >= (Int_t)sizeof(kXR_int16)) {
01111                // The third 2 bytes contain the remote XrdProofdProtocol version
01112                kXR_int16 dver = 0;
01113                memcpy(&dver, pdata, sizeof(kXR_int16));
01114                fXrdProofdVersion = net2host(dver);
01115                pdata = (void *)((char *)pdata + sizeof(kXR_int16));
01116                len -= sizeof(kXR_int16);
01117             } else {
01118                Warning("Create","version of the remote XrdProofdProtocol undefined!");
01119             }
01120          }
01121 
01122          if (len > 0) {
01123             // From top masters, the url of the data pool
01124             char *url = new char[len+1];
01125             memcpy(url, pdata, len);
01126             url[len] = 0;
01127             fBuffer = url;
01128             delete[] url;
01129          }
01130 
01131          // Cleanup
01132          SafeDelete(xrsp);
01133          if (srvresp)
01134             free(srvresp);
01135 
01136          // Notify
01137          return kTRUE;
01138       } else {
01139          // If not free resources now, just give up
01140          if (fConn->GetOpenError() == kXP_TooManySess) {
01141             // Avoid to contact the server any more
01142             fSessionID = -1;
01143             return kFALSE;
01144          } else {
01145             // Print error mag, if any
01146             if ((retriesleft <= 0 || gDebug > 0) && fConn->GetLastErr())
01147                Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
01148          }
01149       }
01150 
01151       if (gDebug > 0)
01152          Info("Create", "creation/attachment attempt failed: %d attempts left", retriesleft);
01153       if (retriesleft <= 0)
01154          Error("Create", "%d creation/attachment attempts failed: no attempts left",
01155                          gEnv->GetValue("XProof.CreationRetries", 4));
01156 
01157    } // Creation retries
01158 
01159    // Notify failure
01160    Error("Create:",
01161          "problems creating or attaching to a remote server (%s)",
01162          ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
01163    return kFALSE;
01164 }
01165 
01166 //______________________________________________________________________________
01167 Int_t TXSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
01168 {
01169    // Send a raw buffer of specified length.
01170    // Use opt = kDontBlock to ask xproofd to push the message into the proofsrv.
01171    // (by default is appended to a queue waiting for a request from proofsrv).
01172    // Returns the number of bytes sent or -1 in case of error.
01173 
01174    TSystem::ResetErrno();
01175 
01176    // Options and request ID
01177    fSendOpt = (opt == kDontBlock) ? (kXPD_async | fSendOpt)
01178                                   : (~kXPD_async & fSendOpt) ;
01179 
01180    // Prepare request
01181    XPClientRequest Request;
01182    memset( &Request, 0, sizeof(Request) );
01183    fConn->SetSID(Request.header.streamid);
01184    Request.sendrcv.requestid = kXP_sendmsg;
01185    Request.sendrcv.sid = fSessionID;
01186    Request.sendrcv.opt = fSendOpt;
01187    Request.sendrcv.cid = GetClientID();
01188    Request.sendrcv.dlen = length;
01189    if (gDebug >= 2)
01190       Info("SendRaw", "sending %d bytes to server", Request.sendrcv.dlen);
01191 
01192    // Send request
01193    XrdClientMessage *xrsp = fConn->SendReq(&Request, buffer, 0, "SendRaw");
01194 
01195    if (xrsp) {
01196       // Prepare return info
01197       Int_t nsent = length;
01198 
01199       // Update counters
01200       fBytesSent += length;
01201 
01202       // Cleanup
01203       SafeDelete(xrsp);
01204 
01205       // Success: update usage timestamp
01206       Touch();
01207 
01208       // ok
01209       return nsent;
01210    } else {
01211       // Print error message, if any
01212       if (fConn->GetLastErr())
01213          Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
01214       else
01215          Printf("%s: error occured but no message from server", fHost.Data());
01216    }
01217 
01218    // Failure notification (avoid using the handler: we may be exiting)
01219    Error("SendRaw", "%s: problems sending %d bytes to server",
01220                     fHost.Data(), length);
01221    return -1;
01222 }
01223 
01224 //______________________________________________________________________________
01225 Bool_t TXSocket::Ping(const char *ord)
01226 {
01227    // Ping functionality: contact the server to check its vitality.
01228    // If external, the server waits for a reply from the server
01229    // Returns kTRUE if OK or kFALSE in case of error.
01230 
01231    TSystem::ResetErrno();
01232 
01233    if (gDebug > 0)
01234       Info("Ping","%p: %s: sid: %d", this, ord ? ord : "int", fSessionID);
01235 
01236    // Make sure we are connected
01237    if (!IsValid()) {
01238       Error("Ping","not connected: nothing to do");
01239       return kFALSE;
01240    }
01241 
01242    // Options
01243    kXR_int32 options = (fMode == 'i') ? kXPD_internal : 0;
01244 
01245    // Prepare request
01246    XPClientRequest Request;
01247    memset( &Request, 0, sizeof(Request) );
01248    fConn->SetSID(Request.header.streamid);
01249    Request.sendrcv.requestid = kXP_ping;
01250    Request.sendrcv.sid = fSessionID;
01251    Request.sendrcv.opt = options;
01252    Request.sendrcv.dlen = 0;
01253 
01254    // Send request
01255    Bool_t res = kFALSE;
01256    if (fMode != 'i') {
01257       char *pans = 0;
01258       XrdClientMessage *xrsp =
01259          fConn->SendReq(&Request, (const void *)0, &pans, "Ping");
01260       kXR_int32 *pres = (kXR_int32 *) pans;
01261 
01262       // Get the result
01263       if (xrsp && xrsp->HeaderStatus() == kXR_ok) {
01264          *pres = net2host(*pres);
01265          res = (*pres == 1) ? kTRUE : kFALSE;
01266          // Success: update usage timestamp
01267          Touch();
01268       } else {
01269          // Print error msg, if any
01270          if (fConn->GetLastErr())
01271             Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
01272       }
01273 
01274       // Cleanup
01275       SafeDelete(xrsp);
01276 
01277    } else {
01278       if (XPD::clientMarshall(&Request) == 0) {
01279          XReqErrorType e = fConn->LowWrite(&Request, 0, 0);
01280          res = (e == kOK) ? kTRUE : kFALSE;
01281       } else {
01282          Error("Ping", "%p: int: problems marshalling request", this);
01283       }
01284    }
01285 
01286    // Failure notification (avoid using the handler: we may be exiting)
01287    if (!res) {
01288       Error("Ping", "%p: %s: problems sending ping to server", this, ord ? ord : "int");
01289    } else if (gDebug > 0) {
01290       Info("Ping","%p: %s: sid: %d OK", this, ord ? ord : "int", fSessionID);
01291    }
01292 
01293    return res;
01294 }
01295 
01296 //______________________________________________________________________________
01297 void TXSocket::RemoteTouch()
01298 {
01299    // Remote touch functionality: contact the server to proof our vitality.
01300    // No reply from server is expected.
01301 
01302    TSystem::ResetErrno();
01303 
01304    if (gDebug > 0)
01305       Info("RemoteTouch","%p: sending touch request to %s", this, GetName());
01306 
01307    // Make sure we are connected
01308    if (!IsValid()) {
01309       Error("RemoteTouch","not connected: nothing to do");
01310       return;
01311    }
01312 
01313    // Prepare request
01314    XPClientRequest Request;
01315    memset( &Request, 0, sizeof(Request) );
01316    fConn->SetSID(Request.header.streamid);
01317    Request.sendrcv.requestid = kXP_touch;
01318    Request.sendrcv.sid = fSessionID;
01319    Request.sendrcv.opt = 0;
01320    Request.sendrcv.dlen = 0;
01321 
01322    // We need the right order
01323    if (XPD::clientMarshall(&Request) != 0) {
01324       Error("Touch", "%p: problems marshalling request ", this);
01325       return;
01326    }
01327    if (fConn->LowWrite(&Request, 0, 0) != kOK)
01328       Error("Touch", "%p: problems sending touch request to server", this);
01329 
01330    // Done
01331    return;
01332 }
01333 
01334 //______________________________________________________________________________
01335 void TXSocket::CtrlC()
01336 {
01337    // Interrupt the remote protocol instance. Used to propagate Ctrl-C.
01338    // No reply from server is expected.
01339 
01340    TSystem::ResetErrno();
01341 
01342    if (gDebug > 0)
01343       Info("CtrlC","%p: sending ctrl-c request to %s", this, GetName());
01344 
01345    // Make sure we are connected
01346    if (!IsValid()) {
01347       Error("CtrlC","not connected: nothing to do");
01348       return;
01349    }
01350 
01351    // Prepare request
01352    XPClientRequest Request;
01353    memset( &Request, 0, sizeof(Request) );
01354    fConn->SetSID(Request.header.streamid);
01355    Request.proof.requestid = kXP_ctrlc;
01356    Request.proof.sid = 0;
01357    Request.proof.dlen = 0;
01358 
01359    // We need the right order
01360    if (XPD::clientMarshall(&Request) != 0) {
01361       Error("CtrlC", "%p: problems marshalling request ", this);
01362       return;
01363    }
01364    if (fConn->LowWrite(&Request, 0, 0) != kOK)
01365       Error("CtrlC", "%p: problems sending ctrl-c request to server", this);
01366 
01367    // Done
01368    return;
01369 }
01370 
01371 //______________________________________________________________________________
01372 Int_t TXSocket::PickUpReady()
01373 {
01374    // Wait and pick-up next buffer from the asynchronous queue
01375 
01376    fBufCur = 0;
01377    fByteLeft = 0;
01378    fByteCur = 0;
01379    if (gDebug > 2)
01380       Info("PickUpReady", "%p: %s: going to sleep", this, GetTitle());
01381 
01382    // User can choose whether to wait forever or for a fixed amount of time
01383    if (!fDontTimeout) {
01384       static Int_t timeout = gEnv->GetValue("XProof.ReadTimeout", 300) * 1000;
01385       static Int_t dt = 2000;
01386       Int_t to = timeout;
01387       while (to && !fRDInterrupt) {
01388          if (fASem.Wait(dt) != 0) {
01389             to -= dt;
01390             if (to <= 0) {
01391                Error("PickUpReady","error waiting at semaphore");
01392                return -1;
01393             } else {
01394                if (gDebug > 0)
01395                   Info("PickUpReady", "%p: %s: got timeout: retring (%d secs)",
01396                                       this, GetTitle(), to/1000);
01397             }
01398          } else
01399             break;
01400       }
01401       // We wait forever
01402       if (fRDInterrupt) {
01403          Error("PickUpReady","interrupted");
01404          fRDInterrupt = kFALSE;
01405          return -1;
01406       }
01407    } else {
01408       // We wait forever
01409       if (fASem.Wait() != 0) {
01410          Error("PickUpReady","error waiting at semaphore");
01411          return -1;
01412       }
01413    }
01414    if (gDebug > 2)
01415       Info("PickUpReady", "%p: %s: waken up", this, GetTitle());
01416 
01417    R__LOCKGUARD(fAMtx);
01418 
01419    // Get message, if any
01420    if (fAQue.size() <= 0) {
01421       Error("PickUpReady","queue is empty - protocol error ?");
01422       return -1;
01423    }
01424    fBufCur = fAQue.front();
01425    // Remove message from the queue
01426    fAQue.pop_front();
01427    // Set number of available bytes
01428    if (fBufCur)
01429       fByteLeft = fBufCur->fLen;
01430 
01431    if (gDebug > 2)
01432       Info("PickUpReady", "%p: %s: got message (%d bytes)",
01433                           this, GetTitle(), (Int_t)(fBufCur ? fBufCur->fLen : 0));
01434 
01435    // Update counters
01436    fBytesRecv += fBufCur->fLen;
01437 
01438    // Set session ID
01439    if (fBufCur->fCid > -1 && fBufCur->fCid != GetClientID())
01440       SetClientID(fBufCur->fCid);
01441 
01442    // Clean entry in the underlying pipe
01443    fgPipe.Clean(this);
01444 
01445    // We are done
01446    return 0;
01447 }
01448 
01449 //______________________________________________________________________________
01450 TXSockBuf *TXSocket::PopUpSpare(Int_t size)
01451 {
01452    // Pop-up a buffer of at least size bytes from the spare list
01453    // If none is found either one is reallocated or a new one
01454    // created
01455    TXSockBuf *buf = 0;
01456    static Int_t nBuf = 0;
01457 
01458 
01459    R__LOCKGUARD(&fgSMtx);
01460 
01461 
01462    Int_t maxsz = 0;
01463    if (fgSQue.size() > 0) {
01464       list<TXSockBuf *>::iterator i;
01465       for (i = fgSQue.begin(); i != fgSQue.end(); i++) {
01466          maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
01467          if ((*i) && (*i)->fSiz >= size) {
01468             buf = *i;
01469             if (gDebug > 2)
01470                Info("PopUpSpare","asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
01471                                  size, (int) fgSQue.size(), nBuf, buf, buf->fSiz);
01472             // Drop from this list
01473             fgSQue.erase(i);
01474             return buf;
01475          }
01476       }
01477       // All buffers are too small: enlarge the first one
01478       buf = fgSQue.front();
01479       buf->Resize(size);
01480       if (gDebug > 2)
01481          Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
01482                            size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
01483       // Drop from this list
01484       fgSQue.pop_front();
01485       return buf;
01486    }
01487 
01488    // Create a new buffer
01489    char *b = (char *)malloc(size);
01490    if (b)
01491       buf = new TXSockBuf(b, size);
01492    nBuf++;
01493    if (gDebug > 2)
01494       Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
01495                         size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
01496 
01497    // We are done
01498    return buf;
01499 }
01500 
01501 //______________________________________________________________________________
01502 void TXSocket::PushBackSpare()
01503 {
01504    // Release read buffer giving back to the spare list
01505 
01506    R__LOCKGUARD(&fgSMtx);
01507 
01508    if (gDebug > 2)
01509       Info("PushBackSpare","release buf %p, sz: %d (BuffMem: %lld)",
01510                            fBufCur, fBufCur->fSiz, TXSockBuf::BuffMem());
01511 
01512    if (TXSockBuf::BuffMem() < TXSockBuf::GetMemMax()) {
01513       fgSQue.push_back(fBufCur);
01514    } else {
01515       delete fBufCur;
01516    }
01517    fBufCur = 0;
01518    fByteCur = 0;
01519    fByteLeft = 0;
01520 }
01521 
01522 //______________________________________________________________________________
01523 Int_t TXSocket::RecvRaw(void *buffer, Int_t length, ESendRecvOptions)
01524 {
01525    // Receive a raw buffer of specified length bytes.
01526 
01527    // Inputs must make sense
01528    if (!buffer || (length <= 0))
01529       return -1;
01530 
01531    // Wait and pick-up a read buffer if we do not have one
01532    if (!fBufCur && (PickUpReady() != 0))
01533       return -1;
01534 
01535    // Use it
01536    if (fByteLeft >= length) {
01537       memcpy(buffer, fBufCur->fBuf + fByteCur, length);
01538       fByteCur += length;
01539       if ((fByteLeft -= length) <= 0)
01540          // All used: give back
01541          PushBackSpare();
01542       // Success: update usage timestamp
01543       Touch();
01544       return length;
01545    } else {
01546       // Copy the first part
01547       memcpy(buffer, fBufCur->fBuf + fByteCur, fByteLeft);
01548       Int_t at = fByteLeft;
01549       Int_t tobecopied = length - fByteLeft;
01550       PushBackSpare();
01551       while (tobecopied > 0) {
01552          // Pick-up next buffer (it may wait inside)
01553          if (PickUpReady() != 0)
01554             return -1;
01555          // Copy the fresh meat
01556          Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
01557          memcpy((void *)((Char_t *)buffer+at), fBufCur->fBuf, ncpy);
01558          fByteCur = ncpy;
01559          if ((fByteLeft -= ncpy) <= 0)
01560             // All used: give back
01561             PushBackSpare();
01562          // Recalculate
01563          tobecopied -= ncpy;
01564          at += ncpy;
01565       }
01566    }
01567 
01568    // Update counters
01569    fBytesRecv  += length;
01570    fgBytesRecv += length;
01571 
01572    // Success: update usage timestamp
01573    Touch();
01574 
01575    return length;
01576 }
01577 
01578 //______________________________________________________________________________
01579 Int_t TXSocket::SendInterrupt(Int_t type)
01580 {
01581    // Send urgent message (interrupt) to remote server
01582    // Returns 0 or -1 in case of error.
01583 
01584    TSystem::ResetErrno();
01585 
01586    // Prepare request
01587    XPClientRequest Request;
01588    memset(&Request, 0, sizeof(Request) );
01589    fConn->SetSID(Request.header.streamid);
01590    if (type == (Int_t) TProof::kShutdownInterrupt)
01591       Request.interrupt.requestid = kXP_destroy;
01592    else
01593       Request.interrupt.requestid = kXP_interrupt;
01594    Request.interrupt.sid = fSessionID;
01595    Request.interrupt.type = type;    // type of interrupt (see TProof::EUrgent)
01596    Request.interrupt.dlen = 0;
01597 
01598    // Send request
01599    XrdClientMessage *xrsp =
01600       fConn->SendReq(&Request, (const void *)0, 0, "SendInterrupt");
01601    if (xrsp) {
01602       // Success: update usage timestamp
01603       Touch();
01604       // Cleanup
01605       SafeDelete(xrsp);
01606       // ok
01607       return 0;
01608    } else {
01609       // Print error msg, if any
01610       if (fConn->GetLastErr())
01611          Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
01612    }
01613 
01614    // Failure notification (avoid using the handler: we may be exiting)
01615    Error("SendInterrupt", "problems sending interrupt to server");
01616    return -1;
01617 }
01618 
01619 //______________________________________________________________________________
01620 Int_t TXSocket::Send(const TMessage &mess)
01621 {
01622    // Send a TMessage object. Returns the number of bytes in the TMessage
01623    // that were sent and -1 in case of error.
01624 
01625    TSystem::ResetErrno();
01626 
01627    if (mess.IsReading()) {
01628       Error("Send", "cannot send a message used for reading");
01629       return -1;
01630    }
01631 
01632    // send streamer infos in case schema evolution is enabled in the TMessage
01633    SendStreamerInfos(mess);
01634 
01635    // send the process id's so TRefs work
01636    SendProcessIDs(mess);
01637 
01638    mess.SetLength();   //write length in first word of buffer
01639 
01640    if (fCompress > 0 && mess.GetCompressionLevel() == 0)
01641       const_cast<TMessage&>(mess).SetCompressionLevel(fCompress);
01642 
01643    if (mess.GetCompressionLevel() > 0)
01644       const_cast<TMessage&>(mess).Compress();
01645 
01646    char *mbuf = mess.Buffer();
01647    Int_t mlen = mess.Length();
01648    if (mess.CompBuffer()) {
01649       mbuf = mess.CompBuffer();
01650       mlen = mess.CompLength();
01651    }
01652 
01653    // Parse message type to choose sending options
01654    kXR_int32 fSendOptDefault = fSendOpt;
01655    switch (mess.What()) {
01656       case kPROOF_PROCESS:
01657          fSendOpt |= kXPD_process;
01658          break;
01659       case kPROOF_PROGRESS:
01660       case kPROOF_FEEDBACK:
01661          fSendOpt |= kXPD_fb_prog;
01662          break;
01663       case kPROOF_QUERYSUBMITTED:
01664          fSendOpt |= kXPD_querynum;
01665          fSendOpt |= kXPD_fb_prog;
01666          break;
01667       case kPROOF_STARTPROCESS:
01668          fSendOpt |= kXPD_startprocess;
01669          fSendOpt |= kXPD_fb_prog;
01670          break;
01671       case kPROOF_STOPPROCESS:
01672          fSendOpt |= kXPD_fb_prog;
01673          break;
01674       case kPROOF_SETIDLE:
01675          fSendOpt |= kXPD_setidle;
01676          fSendOpt |= kXPD_fb_prog;
01677          break;
01678       case kPROOF_LOGFILE:
01679       case kPROOF_LOGDONE:
01680          if (GetClientIDSize() <= 1)
01681             fSendOpt |= kXPD_logmsg;
01682          break;
01683       default:
01684          break;
01685    }
01686 
01687    if (gDebug > 2)
01688       Info("Send", "sending type %d (%d bytes) to '%s'", mess.What(), mlen, GetTitle());
01689 
01690    Int_t nsent = SendRaw(mbuf, mlen);
01691    fSendOpt = fSendOptDefault;
01692 
01693    if (nsent <= 0)
01694       return nsent;
01695 
01696    fBytesSent  += nsent;
01697    fgBytesSent += nsent;
01698 
01699    return nsent - sizeof(UInt_t);  //length - length header
01700 }
01701 
01702 //______________________________________________________________________________
01703 Int_t TXSocket::Recv(TMessage *&mess)
01704 {
01705    // Receive a TMessage object. The user must delete the TMessage object.
01706    // Returns length of message in bytes (can be 0 if other side of connection
01707    // is closed) or -1 in case of error or -5 if pipe broken (connection invalid).
01708    // In those case mess == 0.
01709 
01710    TSystem::ResetErrno();
01711 
01712    if (!IsValid()) {
01713       mess = 0;
01714       return -5;
01715    }
01716 
01717 oncemore:
01718    Int_t  n;
01719    UInt_t len;
01720    if ((n = RecvRaw(&len, sizeof(UInt_t))) <= 0) {
01721       mess = 0;
01722       return n;
01723    }
01724    len = net2host(len);  //from network to host byte order
01725 
01726    char *buf = new char[len+sizeof(UInt_t)];
01727    if ((n = RecvRaw(buf+sizeof(UInt_t), len)) <= 0) {
01728       delete [] buf;
01729       mess = 0;
01730       return n;
01731    }
01732 
01733    fBytesRecv  += n + sizeof(UInt_t);
01734    fgBytesRecv += n + sizeof(UInt_t);
01735 
01736    mess = new TMessage(buf, len+sizeof(UInt_t));
01737 
01738    // receive any streamer infos
01739    if (RecvStreamerInfos(mess))
01740       goto oncemore;
01741 
01742    // receive any process ids
01743    if (RecvProcessIDs(mess))
01744       goto oncemore;
01745 
01746    if (mess->What() & kMESS_ACK) {
01747       // Acknowledgement embedded: ignore ...
01748       mess->SetWhat(mess->What() & ~kMESS_ACK);
01749    }
01750 
01751    return n;
01752 }
01753 
01754 //______________________________________________________________________________
01755 TObjString *TXSocket::SendCoordinator(Int_t kind, const char *msg, Int_t int2,
01756                                       Long64_t l64, Int_t int3, const char *)
01757 {
01758    // Send message to intermediate coordinator.
01759    // If any output is due, this is returned as an obj string to be
01760    // deleted by the caller
01761 
01762    TObjString *sout = 0;
01763 
01764    // We fill the header struct containing the request
01765    XPClientRequest reqhdr;
01766    const void *buf = 0;
01767    char *bout = 0;
01768    char **vout = 0;
01769    memset(&reqhdr, 0, sizeof(reqhdr));
01770    fConn->SetSID(reqhdr.header.streamid);
01771    reqhdr.header.requestid = kXP_admin;
01772    reqhdr.proof.int1 = kind;
01773    reqhdr.proof.int2 = int2;
01774    switch (kind) {
01775       case kQueryROOTVersions:
01776       case kQuerySessions:
01777       case kQueryWorkers:
01778          reqhdr.proof.sid = 0;
01779          reqhdr.header.dlen = 0;
01780          vout = (char **)&bout;
01781          break;
01782       case kCleanupSessions:
01783          reqhdr.proof.int2 = (int2 == 1) ? (kXR_int32) kXPD_AnyServer
01784                                          : (kXR_int32) kXPD_TopMaster;
01785          reqhdr.proof.int3 = int2;
01786          reqhdr.proof.sid = fSessionID;
01787          reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
01788          buf = (msg) ? (const void *)msg : buf;
01789          break;
01790       case kCpFile:
01791       case kGetFile:
01792       case kPutFile:
01793       case kExec:
01794          reqhdr.proof.sid = fSessionID;
01795          reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
01796          buf = (msg) ? (const void *)msg : buf;
01797          vout = (char **)&bout;
01798          break;
01799       case kQueryLogPaths:
01800          vout = (char **)&bout;
01801       case kReleaseWorker:
01802       case kSendMsgToUser:
01803       case kGroupProperties:
01804       case kSessionTag:
01805       case kSessionAlias:
01806          reqhdr.proof.sid = fSessionID;
01807          reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
01808          buf = (msg) ? (const void *)msg : buf;
01809          break;
01810       case kROOTVersion:
01811          reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
01812          buf = (msg) ? (const void *)msg : buf;
01813          break;
01814       case kGetWorkers:
01815          reqhdr.proof.sid = fSessionID;
01816          reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
01817          if (msg)
01818             buf = (const void *)msg;
01819          vout = (char **)&bout;
01820          break;
01821       case kReadBuffer:
01822          reqhdr.header.requestid = kXP_readbuf;
01823          reqhdr.readbuf.ofs = l64;
01824          reqhdr.readbuf.len = int2;
01825          if (int3 > 0 && fXrdProofdVersion < 1003) {
01826             Info("SendCoordinator", "kReadBuffer: old server (ver %d < 1003):"
01827                  " grep functionality not supported", fXrdProofdVersion);
01828             return sout;
01829          }
01830          reqhdr.readbuf.int1 = int3;
01831          if (!msg || strlen(msg) <= 0) {
01832             Info("SendCoordinator", "kReadBuffer: file path undefined");
01833             return sout;
01834          }
01835          reqhdr.header.dlen = strlen(msg);
01836          buf = (const void *)msg;
01837          vout = (char **)&bout;
01838          break;
01839       default:
01840          Info("SendCoordinator", "unknown message kind: %d", kind);
01841          return sout;
01842    }
01843 
01844    // server response header
01845    Bool_t noterr = (gDebug > 0) ? kTRUE : kFALSE;
01846    XrdClientMessage *xrsp =
01847       fConn->SendReq(&reqhdr, buf, vout, "TXSocket::SendCoordinator", noterr);
01848 
01849    // If positive answer
01850    if (xrsp) {
01851       // Check if we need to create an output string
01852       if (bout && (xrsp->DataLen() > 0))
01853          sout = new TObjString(TString(bout,xrsp->DataLen()));
01854       if (bout)
01855          free(bout);
01856       // Success: update usage timestamp
01857       Touch();
01858       SafeDelete(xrsp);
01859    } else {
01860       // Print error msg, if any
01861       if (fConn->GetLastErr())
01862          Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
01863    }
01864 
01865    // Failure notification (avoid using the handler: we may be exiting)
01866    return sout;
01867 }
01868 
01869 //______________________________________________________________________________
01870 void TXSocket::SendUrgent(Int_t type, Int_t int1, Int_t int2)
01871 {
01872    // Send urgent message to counterpart; 'type' specifies the type of
01873    // the message (see TXSocket::EUrgentMsgType), and 'int1', 'int2'
01874    // two containers for additional information.
01875 
01876    TSystem::ResetErrno();
01877 
01878    // Prepare request
01879    XPClientRequest Request;
01880    memset(&Request, 0, sizeof(Request) );
01881    fConn->SetSID(Request.header.streamid);
01882    Request.proof.requestid = kXP_urgent;
01883    Request.proof.sid = fSessionID;
01884    Request.proof.int1 = type;    // type of urgent msg (see TXSocket::EUrgentMsgType)
01885    Request.proof.int2 = int1;    // 4-byte container info 1
01886    Request.proof.int3 = int2;    // 4-byte container info 2
01887    Request.proof.dlen = 0;
01888 
01889    // Send request
01890    XrdClientMessage *xrsp =
01891       fConn->SendReq(&Request, (const void *)0, 0, "SendUrgent");
01892    if (xrsp) {
01893       // Success: update usage timestamp
01894       Touch();
01895       // Cleanup
01896       SafeDelete(xrsp);
01897    } else {
01898       // Print error msg, if any
01899       if (fConn->GetLastErr())
01900          Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
01901    }
01902 
01903    // Done
01904    return;
01905 }
01906 
01907 //_____________________________________________________________________________
01908 void TXSocket::InitEnvs()
01909 {
01910    // Init environment variables for XrdClient
01911 
01912    // Set debug level
01913    Int_t deb = gEnv->GetValue("XProof.Debug", -1);
01914    EnvPutInt(NAME_DEBUG, deb);
01915    if (deb > 0) {
01916       XrdProofdTrace->What |= TRACE_REQ;
01917       if (deb > 1) {
01918          XrdProofdTrace->What |= TRACE_DBG;
01919          if (deb > 2)
01920             XrdProofdTrace->What |= TRACE_ALL;
01921       }
01922    }
01923    const char *cenv = 0;
01924 
01925    // List of domains where connection is allowed
01926    TString allowCO = gEnv->GetValue("XProof.ConnectDomainAllowRE", "");
01927    if (allowCO.Length() > 0)
01928       EnvPutString(NAME_CONNECTDOMAINALLOW_RE, allowCO.Data());
01929 
01930    // List of domains where connection is denied
01931    TString denyCO  = gEnv->GetValue("XProof.ConnectDomainDenyRE", "");
01932    if (denyCO.Length() > 0)
01933       EnvPutString(NAME_CONNECTDOMAINDENY_RE, denyCO.Data());
01934 
01935    // Max number of retries on first connect and related timeout
01936    XrdProofConn::SetRetryParam(-1, -1);
01937    Int_t maxRetries = gEnv->GetValue("XProof.FirstConnectMaxCnt",5);
01938    EnvPutInt(NAME_FIRSTCONNECTMAXCNT, maxRetries);
01939    Int_t connTO = gEnv->GetValue("XProof.ConnectTimeout", 2);
01940    EnvPutInt(NAME_CONNECTTIMEOUT, connTO);
01941 
01942    // Reconnect Wait
01943    Int_t recoTO = gEnv->GetValue("XProof.ReconnectWait",
01944                                   DFLT_RECONNECTWAIT);
01945    if (recoTO == DFLT_RECONNECTWAIT) {
01946       // Check also the old variable name
01947       recoTO = gEnv->GetValue("XProof.ReconnectTimeout",
01948                                   DFLT_RECONNECTWAIT);
01949    }
01950    EnvPutInt(NAME_RECONNECTWAIT, recoTO);
01951 
01952    // Request Timeout
01953    Int_t requTO = gEnv->GetValue("XProof.RequestTimeout", 150);
01954    EnvPutInt(NAME_REQUESTTIMEOUT, requTO);
01955 
01956    // No automatic proofd backward-compatibility
01957    EnvPutInt(NAME_KEEPSOCKOPENIFNOTXRD, 0);
01958 
01959    // Dynamic forwarding (SOCKS4)
01960    TString socks4Host = gEnv->GetValue("XNet.SOCKS4Host","");
01961    Int_t socks4Port = gEnv->GetValue("XNet.SOCKS4Port",-1);
01962    if (socks4Port > 0) {
01963       if (socks4Host.IsNull())
01964          // Default
01965          socks4Host = "127.0.0.1";
01966       EnvPutString(NAME_SOCKS4HOST, socks4Host.Data());
01967       EnvPutInt(NAME_SOCKS4PORT, socks4Port);
01968    }
01969 
01970    // For password-based authentication
01971    TString autolog = gEnv->GetValue("XSec.Pwd.AutoLogin","1");
01972    if (autolog.Length() > 0 &&
01973       (!(cenv = gSystem->Getenv("XrdSecPWDAUTOLOG")) || strlen(cenv) <= 0))
01974       gSystem->Setenv("XrdSecPWDAUTOLOG",autolog.Data());
01975 
01976    // For password-based authentication
01977    TString netrc;
01978    netrc.Form("%s/.rootnetrc",gSystem->HomeDirectory());
01979    gSystem->Setenv("XrdSecNETRC", netrc.Data());
01980 
01981    TString alogfile = gEnv->GetValue("XSec.Pwd.ALogFile","");
01982    if (alogfile.Length() > 0)
01983       gSystem->Setenv("XrdSecPWDALOGFILE",alogfile.Data());
01984 
01985    TString verisrv = gEnv->GetValue("XSec.Pwd.VerifySrv","1");
01986    if (verisrv.Length() > 0 &&
01987       (!(cenv = gSystem->Getenv("XrdSecPWDVERIFYSRV")) || strlen(cenv) <= 0))
01988       gSystem->Setenv("XrdSecPWDVERIFYSRV",verisrv.Data());
01989 
01990    TString srvpuk = gEnv->GetValue("XSec.Pwd.ServerPuk","");
01991    if (srvpuk.Length() > 0)
01992       gSystem->Setenv("XrdSecPWDSRVPUK",srvpuk.Data());
01993 
01994    // For GSI authentication
01995    TString cadir = gEnv->GetValue("XSec.GSI.CAdir","");
01996    if (cadir.Length() > 0)
01997       gSystem->Setenv("XrdSecGSICADIR",cadir.Data());
01998 
01999    TString crldir = gEnv->GetValue("XSec.GSI.CRLdir","");
02000    if (crldir.Length() > 0)
02001       gSystem->Setenv("XrdSecGSICRLDIR",crldir.Data());
02002 
02003    TString crlext = gEnv->GetValue("XSec.GSI.CRLextension","");
02004    if (crlext.Length() > 0)
02005       gSystem->Setenv("XrdSecGSICRLEXT",crlext.Data());
02006 
02007    TString ucert = gEnv->GetValue("XSec.GSI.UserCert","");
02008    if (ucert.Length() > 0)
02009       gSystem->Setenv("XrdSecGSIUSERCERT",ucert.Data());
02010 
02011    TString ukey = gEnv->GetValue("XSec.GSI.UserKey","");
02012    if (ukey.Length() > 0)
02013       gSystem->Setenv("XrdSecGSIUSERKEY",ukey.Data());
02014 
02015    TString upxy = gEnv->GetValue("XSec.GSI.UserProxy","");
02016    if (upxy.Length() > 0)
02017       gSystem->Setenv("XrdSecGSIUSERPROXY",upxy.Data());
02018 
02019    TString valid = gEnv->GetValue("XSec.GSI.ProxyValid","");
02020    if (valid.Length() > 0)
02021       gSystem->Setenv("XrdSecGSIPROXYVALID",valid.Data());
02022 
02023    TString deplen = gEnv->GetValue("XSec.GSI.ProxyForward","0");
02024    if (deplen.Length() > 0 &&
02025       (!(cenv = gSystem->Getenv("XrdSecGSIPROXYDEPLEN")) || strlen(cenv) <= 0))
02026       gSystem->Setenv("XrdSecGSIPROXYDEPLEN",deplen.Data());
02027 
02028    TString pxybits = gEnv->GetValue("XSec.GSI.ProxyKeyBits","");
02029    if (pxybits.Length() > 0)
02030       gSystem->Setenv("XrdSecGSIPROXYKEYBITS",pxybits.Data());
02031 
02032    TString crlcheck = gEnv->GetValue("XSec.GSI.CheckCRL","1");
02033    if (crlcheck.Length() > 0 &&
02034       (!(cenv = gSystem->Getenv("XrdSecGSICRLCHECK")) || strlen(cenv) <= 0))
02035       gSystem->Setenv("XrdSecGSICRLCHECK",crlcheck.Data());
02036 
02037    TString delegpxy = gEnv->GetValue("XSec.GSI.DelegProxy","0");
02038    if (delegpxy.Length() > 0 &&
02039       (!(cenv = gSystem->Getenv("XrdSecGSIDELEGPROXY")) || strlen(cenv) <= 0))
02040       gSystem->Setenv("XrdSecGSIDELEGPROXY",delegpxy.Data());
02041 
02042    TString signpxy = gEnv->GetValue("XSec.GSI.SignProxy","1");
02043    if (signpxy.Length() > 0 &&
02044       (!(cenv = gSystem->Getenv("XrdSecGSISIGNPROXY")) || strlen(cenv) <= 0))
02045       gSystem->Setenv("XrdSecGSISIGNPROXY",signpxy.Data());
02046 
02047    // Print the tag, if required (only once)
02048    if (gEnv->GetValue("XNet.PrintTAG",0) == 1)
02049       ::Info("TXSocket","(C) 2005 CERN TXSocket (XPROOF client) %s",
02050             gROOT->GetVersion());
02051 
02052    // Only once
02053    fgInitDone = kTRUE;
02054 }
02055 
02056 //______________________________________________________________________________
02057 Int_t TXSocket::Reconnect()
02058 {
02059    // Try reconnection after failure
02060 
02061    if (gDebug > 0) {
02062       Info("Reconnect", "%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
02063                         this, fConn, (fConn ? fConn->IsValid() : 0),
02064                         fUrl.Data(), fConn->GetLogConnID());
02065    }
02066 
02067    if (fXrdProofdVersion < 1005) {
02068       Info("Reconnect","%p: server does not support reconnections (protocol: %d < 1005)",
02069                        this, fXrdProofdVersion);
02070       return -1;
02071    }
02072 
02073    if (fConn) {
02074       if (gDebug > 0)
02075          Info("Reconnect", "%p: locking phyconn: %p", this, fConn->fPhyConn);
02076       fConn->ReConnect();
02077       if (fConn->IsValid()) {
02078          // Create new proofserv if not client manager or administrator or internal mode
02079          if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A') {
02080             // We attach or create
02081             if (!Create(kTRUE)) {
02082                // Failure
02083                Error("TXSocket", "create or attach failed (%s)",
02084                      ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
02085                Close();
02086                return -1;
02087             }
02088          }
02089       }
02090    }
02091 
02092    if (gDebug > 0) {
02093       Info("Reconnect", "%p (c:%p): attempt %s (logid: %d)", this, fConn,
02094                         ((fConn && fConn->IsValid()) ? "succeeded!" : "failed"),
02095                         fConn->GetLogConnID() );
02096    }
02097 
02098    // Done
02099    return ((fConn && fConn->IsValid()) ? 0 : -1);
02100 }
02101 
02102 //_____________________________________________________________________________
02103 TXSockBuf::TXSockBuf(Char_t *bp, Int_t sz, Bool_t own)
02104 {
02105    //constructor
02106    fBuf = fMem = bp;
02107    fSiz = fLen = sz;
02108    fOwn = own;
02109    fCid = -1;
02110    fgBuffMem += sz;
02111 }
02112 
02113 //_____________________________________________________________________________
02114 TXSockBuf::~TXSockBuf()
02115 {
02116    //destructor
02117    if (fOwn && fMem) {
02118       free(fMem);
02119       fgBuffMem -= fSiz;
02120    }
02121 }
02122 
02123 //_____________________________________________________________________________
02124 void TXSockBuf::Resize(Int_t sz)
02125 {
02126    //resize socket buffer
02127    if (sz > fSiz) {
02128       if ((fMem = (Char_t *)realloc(fMem, sz))) {
02129          fgBuffMem += (sz - fSiz);
02130          fBuf = fMem;
02131          fSiz = sz;
02132          fLen = 0;
02133       }
02134    }
02135 }
02136 
02137 //_____________________________________________________________________________
02138 //
02139 // TXSockBuf static methods
02140 //
02141 
02142 //_____________________________________________________________________________
02143 Long64_t TXSockBuf::BuffMem()
02144 {
02145    // Return the currently allocated memory
02146 
02147    return fgBuffMem;
02148 }
02149 
02150 //_____________________________________________________________________________
02151 Long64_t TXSockBuf::GetMemMax()
02152 {
02153    // Return the max allocated memory allowed
02154 
02155    return fgMemMax;
02156 }
02157 
02158 //_____________________________________________________________________________
02159 void TXSockBuf::SetMemMax(Long64_t memmax)
02160 {
02161    // Return the max allocated memory allowed
02162 
02163    fgMemMax = memmax > 0 ? memmax : fgMemMax;
02164 }
02165 
02166 //_____________________________________________________________________________
02167 //
02168 // TXSockPipe
02169 //
02170 
02171 //_____________________________________________________________________________
02172 TXSockPipe::TXSockPipe(const char *loc) : fMutex(kTRUE), fLoc(loc)
02173 {
02174    // Constructor
02175 
02176    // Create the pipe
02177    if (pipe(fPipe) != 0) {
02178       Printf("TXSockPipe: problem initializing pipe for socket inputs");
02179       fPipe[0] = -1;
02180       fPipe[1] = -1;
02181       return;
02182    }
02183 }
02184 
02185 //_____________________________________________________________________________
02186 TXSockPipe::~TXSockPipe()
02187 {
02188    // Destructor
02189 
02190    if (fPipe[0] >= 0) close(fPipe[0]);
02191    if (fPipe[1] >= 0) close(fPipe[1]);
02192 }
02193 
02194 
02195 //____________________________________________________________________________
02196 Int_t TXSockPipe::Post(TSocket *s)
02197 {
02198    // Write a byte to the global pipe to signal new availibility of
02199    // new messages
02200 
02201    if (!IsValid() || !s) return -1;
02202 
02203    // This must be an atomic action
02204    Int_t sz = 0;
02205    {  R__LOCKGUARD(&fMutex);
02206       // Add this one
02207       fReadySock.Add(s);
02208 
02209       // Only one char
02210       Char_t c = 1;
02211       if (write(fPipe[1],(const void *)&c, sizeof(Char_t)) < 1) {
02212          Printf("TXSockPipe::Post: %s: can't notify pipe", fLoc.Data());
02213          return -1;
02214       }
02215       if (gDebug > 2) sz = fReadySock.GetSize();
02216    }
02217 
02218    if (gDebug > 2)
02219       Printf("TXSockPipe::Post: %s: %p: pipe posted (pending %d)",
02220                                fLoc.Data(), s, sz);
02221    // We are done
02222    return 0;
02223 }
02224 
02225 //____________________________________________________________________________
02226 Int_t TXSockPipe::Clean(TSocket *s)
02227 {
02228    // Read a byte to the global pipe to synchronize message pickup
02229 
02230    // Pipe must have been created
02231    if (!IsValid() || !s) return -1;
02232 
02233    // Only one char
02234    Int_t sz = 0;
02235    Char_t c = 0;
02236    { R__LOCKGUARD(&fMutex);
02237       if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1) {
02238          Printf("TXSockPipe::Clean: %s: can't read from pipe", fLoc.Data());
02239          return -1;
02240       }
02241       // Remove this one
02242       fReadySock.Remove(s);
02243 
02244       if (gDebug > 2) sz = fReadySock.GetSize();
02245    }
02246 
02247    if (gDebug > 2)
02248       Printf("TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d)",
02249                                fLoc.Data(), s, sz);
02250 
02251    // We are done
02252    return 0;
02253 }
02254 
02255 //____________________________________________________________________________
02256 Int_t TXSockPipe::Flush(TSocket *s)
02257 {
02258    // Remove any reference to socket 's' from the global pipe and
02259    // ready-socket queue
02260 
02261    // Pipe must have been created
02262    if (!IsValid() || !s) return -1;
02263 
02264    TObject *o = 0;
02265    // This must be an atomic action
02266    {  R__LOCKGUARD(&fMutex);
02267       o = fReadySock.FindObject(s);
02268 
02269       while (o) {
02270          // Remove from the list
02271          fReadySock.Remove(s);
02272          o = fReadySock.FindObject(s);
02273          // Remove one notification from the pipe
02274          Char_t c = 0;
02275          if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1)
02276             Printf("TXSockPipe::Flush: %s: can't read from pipe", fLoc.Data());
02277       }
02278    }
02279    // Flush also the socket
02280    ((TXSocket *)s)->Flush();
02281 
02282    // Notify
02283    if (gDebug > 0)
02284       Printf("TXSockPipe::Flush: %s: %p: pipe flushed", fLoc.Data(), s);
02285 
02286    // We are done
02287    return 0;
02288 }
02289 
02290 //______________________________________________________________________________
02291 void TXSockPipe::DumpReadySock()
02292 {
02293    // Dump content of the ready socket list
02294 
02295    R__LOCKGUARD(&fMutex);
02296 
02297    TString buf = Form("%d |", fReadySock.GetSize());
02298    TIter nxs(&fReadySock);
02299    TObject *o = 0;
02300    while ((o = nxs()))
02301       buf += Form(" %p",o);
02302    Printf("TXSockPipe::DumpReadySock: %s: list content: %s", fLoc.Data(), buf.Data());
02303 }
02304 
02305 //______________________________________________________________________________
02306 TXSocket *TXSockPipe::GetLastReady()
02307 {
02308    // Return last ready socket
02309 
02310    R__LOCKGUARD(&fMutex);
02311 
02312    return (TXSocket *) fReadySock.Last();
02313 }

Generated on Tue Jul 5 14:52:28 2011 for ROOT_528-00b_version by  doxygen 1.5.1