TXSocket.h

Go to the documentation of this file.
00001 // @(#)root/proofx:$Id: TXSocket.h 34428 2010-07-15 12:35:34Z ganis $
00002 // Author: G. Ganis Oct 2005
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 #ifndef ROOT_TXSocket
00013 #define ROOT_TXSocket
00014 
00015 //////////////////////////////////////////////////////////////////////////
00016 //                                                                      //
00017 // TXSocket                                                             //
00018 //                                                                      //
00019 // High level handler of connections to xproofd.                        //
00020 //                                                                      //
00021 //////////////////////////////////////////////////////////////////////////
00022 
00023 #define DFLT_CONNECTMAXTRY           10
00024 
00025 #ifndef ROOT_TMutex
00026 #include "TMutex.h"
00027 #endif
00028 #ifndef ROOT_TSemaphore
00029 #include "TSemaphore.h"
00030 #endif
00031 #ifndef ROOT_TString
00032 #include "TString.h"
00033 #endif
00034 #ifndef ROOT_TList
00035 #include "TList.h"
00036 #endif
00037 #ifndef ROOT_TMessage
00038 #include "TMessage.h"
00039 #endif
00040 #ifndef ROOT_TUrl
00041 #include "TUrl.h"
00042 #endif
00043 #ifndef ROOT_TSocket
00044 #include "TSocket.h"
00045 #endif
00046 #ifndef ROOT_XrdProofConn
00047 #include "XrdProofConn.h"
00048 #endif
00049 #ifndef XRC_UNSOLMSG_H
00050 #include "XrdClient/XrdClientUnsolMsg.hh"
00051 #endif
00052 
00053 #include <list>
00054 
00055 class TObjString;
00056 class TXSockBuf;
00057 class TXSockPipe;
00058 class TXHandler;
00059 class TXSocketHandler;
00060 class XrdClientMessage;
00061 
00062 // To transmit info to Handlers
00063 typedef struct {
00064    Int_t   fInt1;
00065    Int_t   fInt2;
00066    Int_t   fInt3;
00067    Int_t   fInt4;
00068 } XHandleIn_t;
00069 typedef struct {
00070    Int_t   fOpt;
00071    const char *fMsg;
00072 } XHandleErr_t;
00073 
00074 class TXSocket  : public TSocket, public XrdClientAbsUnsolMsgHandler {
00075 
00076 friend class TXProofMgr;
00077 friend class TXProofServ;
00078 friend class TXSlave;
00079 friend class TXSocketHandler;
00080 friend class TXSockPipe;
00081 friend class TXUnixSocket;
00082 
00083 private:
00084    char                fMode;          // 'e' (def) or 'i' (internal - proofsrv)
00085    kXR_int32           fSendOpt;       // Options for sending messages
00086    Short_t             fSessionID;     // proofsrv: remote ID of connected session
00087    TString             fUser;          // Username used for login
00088    TString             fHost;          // Remote host
00089    Int_t               fPort;          // Remote port
00090 
00091    Int_t               fLogLevel;      // Log level to be transmitted to servers
00092 
00093    TString             fBuffer;        // Container for exchanging information
00094    TObject            *fReference;     // Generic object reference of this socket
00095    TXHandler          *fHandler;       // Handler of asynchronous events (input, error)
00096 
00097    XrdProofConn       *fConn;          // instance of the underlying connection module
00098 
00099    // Asynchronous messages
00100    TSemaphore          fASem;          // Control access to conn async msg queue
00101    TMutex             *fAMtx;          // To protect async msg queue
00102    std::list<TXSockBuf *> fAQue;          // list of asynchronous messages
00103    Int_t               fByteLeft;      // bytes left in the first buffer
00104    Int_t               fByteCur;       // current position in the first buffer
00105    TXSockBuf          *fBufCur;        // current read buffer
00106 
00107    TSemaphore          fAsynProc;      // Control actions while processing async messages
00108 
00109    // Interrupts
00110    TMutex             *fIMtx;          // To protect interrupt queue
00111    kXR_int32           fILev;          // Highest received interrupt
00112    Bool_t              fIForward;      // Wheter the interrupt should be propagated
00113 
00114    // Process ID of the instatiating process (to signal interrupts)
00115    Int_t               fPid;
00116 
00117    // Whether to timeout or not
00118    Bool_t              fDontTimeout;   // If true wait forever for incoming messages
00119    Bool_t              fRDInterrupt;   // To interrupt waiting for messages
00120 
00121    // Version of the remote XrdProofdProtocol
00122    Int_t               fXrdProofdVersion;
00123 
00124    // Static area for input handling
00125    static TXSockPipe   fgPipe;         //  Pipe for input monitoring
00126    static TString      fgLoc;          // Location string
00127    static Bool_t       fgInitDone;     // Avoid initializing more than once
00128 
00129    // List of spare buffers
00130    static TMutex       fgSMtx;          // To protect spare list
00131    static std::list<TXSockBuf *> fgSQue; // list of spare buffers
00132 
00133    // Manage asynchronous message
00134    Int_t               PickUpReady();
00135    TXSockBuf          *PopUpSpare(Int_t sz);
00136    void                PushBackSpare();
00137 
00138    // Post a message into the queue for asynchronous processing
00139    void                PostMsg(Int_t type, const char *msg = 0);
00140 
00141    // Auxilliary
00142    Int_t               GetLowSocket() const { return (fConn ? fConn->GetLowSocket() : -1); }
00143 
00144    static void         SetLocation(const char *loc = ""); // Set location string
00145 
00146    static void         InitEnvs(); // Initialize environment variables
00147 
00148 public:
00149    // Should be the same as in proofd/src/XrdProofdProtocol::Urgent
00150    enum EUrgentMsgType { kStopProcess = 2000 };
00151 
00152    TXSocket(const char *url, Char_t mode = 'M', Int_t psid = -1, Char_t ver = -1,
00153             const char *logbuf = 0, Int_t loglevel = -1, TXHandler *handler = 0);
00154    TXSocket(const TXSocket &xs);
00155    TXSocket& operator=(const TXSocket& xs);
00156    virtual ~TXSocket();
00157 
00158    virtual void        Close(Option_t *opt = "");
00159    Bool_t              Create(Bool_t attach = kFALSE);
00160    void                DisconnectSession(Int_t id, Option_t *opt = "");
00161 
00162    void                DoError(int level,
00163                                const char *location, const char *fmt, va_list va) const;
00164 
00165    virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s,
00166                                                      XrdClientMessage *msg);
00167 
00168    virtual Int_t       GetClientID() const { return -1; }
00169    virtual Int_t       GetClientIDSize() const { return 1; }
00170    Int_t               GetLogConnID() const { return (fConn ? fConn->GetLogConnID() : -1); }
00171    Int_t               GetOpenError() const { return (fConn ? fConn->GetOpenError() : -1); }
00172    Int_t               GetServType() const { return (fConn ? fConn->GetServType() : -1); }
00173    Int_t               GetSessionID() const { return (fConn ? fConn->GetSessionID() : -1); }
00174    Int_t               GetXrdProofdVersion() const { return fXrdProofdVersion; }
00175 
00176    Bool_t              IsValid() const { return (fConn ? (fConn->IsValid()) : kFALSE); }
00177    Bool_t              IsServProofd();
00178    virtual void        RemoveClientID() { }
00179    virtual void        SetClientID(Int_t) { }
00180    void                SetSendOpt(ESendRecvOptions o) { fSendOpt = o; }
00181    void                SetSessionID(Int_t id);
00182 
00183    // Send interfaces
00184    Int_t               Send(const TMessage &mess);
00185    Int_t               Send(Int_t kind) { return TSocket::Send(kind); }
00186    Int_t               Send(Int_t status, Int_t kind)
00187                                         { return TSocket::Send(status, kind); }
00188    Int_t               Send(const char *mess, Int_t kind = kMESS_STRING)
00189                                         { return TSocket::Send(mess, kind); }
00190    Int_t               SendRaw(const void *buf, Int_t len,
00191                                ESendRecvOptions opt = kDontBlock);
00192 
00193    TObjString         *SendCoordinator(Int_t kind, const char *msg = 0, Int_t int2 = 0,
00194                                        Long64_t l64 = 0, Int_t int3 = 0, const char *opt = 0);
00195 
00196    // Recv interfaces
00197    Int_t               Recv(TMessage *&mess);
00198    Int_t               Recv(Int_t &status, Int_t &kind)
00199                                         { return TSocket::Recv(status, kind); }
00200    Int_t               Recv(char *mess, Int_t max)
00201                                         { return TSocket::Recv(mess, max); }
00202    Int_t               Recv(char *mess, Int_t max, Int_t &kind)
00203                                         { return TSocket::Recv(mess, max, kind); }
00204    Int_t               RecvRaw(void *buf, Int_t len,
00205                                ESendRecvOptions opt = kDefault);
00206 
00207    // Interrupts
00208    Int_t               SendInterrupt(Int_t type);
00209    Int_t               GetInterrupt(Bool_t &forward);
00210 
00211    // Urgent message
00212    void                SendUrgent(Int_t type, Int_t int1, Int_t int2);
00213 
00214    // Interrupt the low level socket
00215    void                SetInterrupt() { fRDInterrupt = kTRUE;
00216                                         if (fConn) fConn->SetInterrupt(); }
00217 
00218    // Flush the asynchronous queue
00219    Int_t               Flush();
00220 
00221    // Ping the counterpart
00222    Bool_t              Ping(const char *ord = 0);
00223 
00224    // Request remote touch of the admin file associated with this connection
00225    void                RemoteTouch();
00226    // Propagate a Ctrl-C
00227    void                CtrlC();
00228 
00229    // Standard options cannot be set
00230    Int_t               SetOption(ESockOptions, Int_t) { return 0; }
00231 
00232    // Disable / Enable read timeout
00233    void                DisableTimeout() { fDontTimeout = kTRUE; }
00234    void                EnableTimeout() { fDontTimeout = kFALSE; }
00235 
00236    // Try reconnection after error
00237    virtual Int_t       Reconnect();
00238 
00239    ClassDef(TXSocket, 0) //A high level connection class for PROOF
00240 };
00241 
00242 
00243 //
00244 // The following structure is used to store buffers received asynchronously
00245 //
00246 class TXSockBuf {
00247 public:
00248    Int_t   fSiz;
00249    Int_t   fLen;
00250    Char_t *fBuf;
00251    Bool_t  fOwn;
00252    Int_t   fCid;
00253 
00254    TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1);
00255   ~TXSockBuf();
00256 
00257    void Resize(Int_t sz);
00258 
00259    static Long64_t BuffMem();
00260    static Long64_t GetMemMax();
00261    static void     SetMemMax(Long64_t memmax);
00262 
00263 private:
00264    Char_t *fMem;
00265    static Long64_t fgBuffMem; // Total allocated memory
00266    static Long64_t fgMemMax;  // Max allocated memory allowed
00267 };
00268 
00269 //
00270 // The following class describes internal pipes
00271 //
00272 class TXSockPipe {
00273 public:
00274 
00275    TXSockPipe(const char *loc = "");
00276    virtual ~TXSockPipe();
00277 
00278    Bool_t       IsValid() const { return ((fPipe[0] >= 0 && fPipe[1] >= 0) ? kTRUE : kFALSE); }
00279 
00280    TXSocket    *GetLastReady();
00281 
00282    Int_t        GetRead() const { return fPipe[0]; }
00283    Int_t        Post(TSocket *s);  // Notify socket ready via global pipe
00284    Int_t        Clean(TSocket *s); // Clean previous pipe notification
00285    Int_t        Flush(TSocket *s); // Remove any instance of 's' from the pipe
00286    void         DumpReadySock();
00287 
00288    void         SetLoc(const char *loc = "") { fLoc = loc; }
00289 
00290 private:
00291    TMutex       fMutex;     // Protect access to the sockets-ready list
00292    Int_t        fPipe[2];   // Pipe for input monitoring
00293    TString      fLoc;       // Location string
00294    TList        fReadySock;    // List of sockets ready to be read
00295 };
00296 
00297 //
00298 // Guard for a semaphore
00299 //
00300 class TXSemaphoreGuard {
00301 public:
00302 
00303    TXSemaphoreGuard(TSemaphore *sem) : fSem(sem), fValid(kTRUE) { if (!fSem || fSem->TryWait()) fValid = kFALSE; }
00304    virtual ~TXSemaphoreGuard() { if (fValid && fSem) fSem->Post(); }
00305 
00306    Bool_t       IsValid() const { return fValid; }
00307 
00308 private:
00309    TSemaphore  *fSem;
00310    Bool_t       fValid;
00311 };
00312 
00313 #endif

Generated on Tue Jul 5 14:52:27 2011 for ROOT_528-00b_version by  doxygen 1.5.1