00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #ifndef ROOT_TXSocket
00013 #define ROOT_TXSocket
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #define DFLT_CONNECTMAXTRY 10
00024
00025 #ifndef ROOT_TMutex
00026 #include "TMutex.h"
00027 #endif
00028 #ifndef ROOT_TSemaphore
00029 #include "TSemaphore.h"
00030 #endif
00031 #ifndef ROOT_TString
00032 #include "TString.h"
00033 #endif
00034 #ifndef ROOT_TList
00035 #include "TList.h"
00036 #endif
00037 #ifndef ROOT_TMessage
00038 #include "TMessage.h"
00039 #endif
00040 #ifndef ROOT_TUrl
00041 #include "TUrl.h"
00042 #endif
00043 #ifndef ROOT_TSocket
00044 #include "TSocket.h"
00045 #endif
00046 #ifndef ROOT_XrdProofConn
00047 #include "XrdProofConn.h"
00048 #endif
00049 #ifndef XRC_UNSOLMSG_H
00050 #include "XrdClient/XrdClientUnsolMsg.hh"
00051 #endif
00052
00053 #include <list>
00054
00055 class TObjString;
00056 class TXSockBuf;
00057 class TXSockPipe;
00058 class TXHandler;
00059 class TXSocketHandler;
00060 class XrdClientMessage;
00061
00062
00063 typedef struct {
00064 Int_t fInt1;
00065 Int_t fInt2;
00066 Int_t fInt3;
00067 Int_t fInt4;
00068 } XHandleIn_t;
00069 typedef struct {
00070 Int_t fOpt;
00071 const char *fMsg;
00072 } XHandleErr_t;
00073
00074 class TXSocket : public TSocket, public XrdClientAbsUnsolMsgHandler {
00075
00076 friend class TXProofMgr;
00077 friend class TXProofServ;
00078 friend class TXSlave;
00079 friend class TXSocketHandler;
00080 friend class TXSockPipe;
00081 friend class TXUnixSocket;
00082
00083 private:
00084 char fMode;
00085 kXR_int32 fSendOpt;
00086 Short_t fSessionID;
00087 TString fUser;
00088 TString fHost;
00089 Int_t fPort;
00090
00091 Int_t fLogLevel;
00092
00093 TString fBuffer;
00094 TObject *fReference;
00095 TXHandler *fHandler;
00096
00097 XrdProofConn *fConn;
00098
00099
00100 TSemaphore fASem;
00101 TMutex *fAMtx;
00102 std::list<TXSockBuf *> fAQue;
00103 Int_t fByteLeft;
00104 Int_t fByteCur;
00105 TXSockBuf *fBufCur;
00106
00107 TSemaphore fAsynProc;
00108
00109
00110 TMutex *fIMtx;
00111 kXR_int32 fILev;
00112 Bool_t fIForward;
00113
00114
00115 Int_t fPid;
00116
00117
00118 Bool_t fDontTimeout;
00119 Bool_t fRDInterrupt;
00120
00121
00122 Int_t fXrdProofdVersion;
00123
00124
00125 static TXSockPipe fgPipe;
00126 static TString fgLoc;
00127 static Bool_t fgInitDone;
00128
00129
00130 static TMutex fgSMtx;
00131 static std::list<TXSockBuf *> fgSQue;
00132
00133
00134 Int_t PickUpReady();
00135 TXSockBuf *PopUpSpare(Int_t sz);
00136 void PushBackSpare();
00137
00138
00139 void PostMsg(Int_t type, const char *msg = 0);
00140
00141
00142 Int_t GetLowSocket() const { return (fConn ? fConn->GetLowSocket() : -1); }
00143
00144 static void SetLocation(const char *loc = "");
00145
00146 static void InitEnvs();
00147
00148 public:
00149
00150 enum EUrgentMsgType { kStopProcess = 2000 };
00151
00152 TXSocket(const char *url, Char_t mode = 'M', Int_t psid = -1, Char_t ver = -1,
00153 const char *logbuf = 0, Int_t loglevel = -1, TXHandler *handler = 0);
00154 TXSocket(const TXSocket &xs);
00155 TXSocket& operator=(const TXSocket& xs);
00156 virtual ~TXSocket();
00157
00158 virtual void Close(Option_t *opt = "");
00159 Bool_t Create(Bool_t attach = kFALSE);
00160 void DisconnectSession(Int_t id, Option_t *opt = "");
00161
00162 void DoError(int level,
00163 const char *location, const char *fmt, va_list va) const;
00164
00165 virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s,
00166 XrdClientMessage *msg);
00167
00168 virtual Int_t GetClientID() const { return -1; }
00169 virtual Int_t GetClientIDSize() const { return 1; }
00170 Int_t GetLogConnID() const { return (fConn ? fConn->GetLogConnID() : -1); }
00171 Int_t GetOpenError() const { return (fConn ? fConn->GetOpenError() : -1); }
00172 Int_t GetServType() const { return (fConn ? fConn->GetServType() : -1); }
00173 Int_t GetSessionID() const { return (fConn ? fConn->GetSessionID() : -1); }
00174 Int_t GetXrdProofdVersion() const { return fXrdProofdVersion; }
00175
00176 Bool_t IsValid() const { return (fConn ? (fConn->IsValid()) : kFALSE); }
00177 Bool_t IsServProofd();
00178 virtual void RemoveClientID() { }
00179 virtual void SetClientID(Int_t) { }
00180 void SetSendOpt(ESendRecvOptions o) { fSendOpt = o; }
00181 void SetSessionID(Int_t id);
00182
00183
00184 Int_t Send(const TMessage &mess);
00185 Int_t Send(Int_t kind) { return TSocket::Send(kind); }
00186 Int_t Send(Int_t status, Int_t kind)
00187 { return TSocket::Send(status, kind); }
00188 Int_t Send(const char *mess, Int_t kind = kMESS_STRING)
00189 { return TSocket::Send(mess, kind); }
00190 Int_t SendRaw(const void *buf, Int_t len,
00191 ESendRecvOptions opt = kDontBlock);
00192
00193 TObjString *SendCoordinator(Int_t kind, const char *msg = 0, Int_t int2 = 0,
00194 Long64_t l64 = 0, Int_t int3 = 0, const char *opt = 0);
00195
00196
00197 Int_t Recv(TMessage *&mess);
00198 Int_t Recv(Int_t &status, Int_t &kind)
00199 { return TSocket::Recv(status, kind); }
00200 Int_t Recv(char *mess, Int_t max)
00201 { return TSocket::Recv(mess, max); }
00202 Int_t Recv(char *mess, Int_t max, Int_t &kind)
00203 { return TSocket::Recv(mess, max, kind); }
00204 Int_t RecvRaw(void *buf, Int_t len,
00205 ESendRecvOptions opt = kDefault);
00206
00207
00208 Int_t SendInterrupt(Int_t type);
00209 Int_t GetInterrupt(Bool_t &forward);
00210
00211
00212 void SendUrgent(Int_t type, Int_t int1, Int_t int2);
00213
00214
00215 void SetInterrupt() { fRDInterrupt = kTRUE;
00216 if (fConn) fConn->SetInterrupt(); }
00217
00218
00219 Int_t Flush();
00220
00221
00222 Bool_t Ping(const char *ord = 0);
00223
00224
00225 void RemoteTouch();
00226
00227 void CtrlC();
00228
00229
00230 Int_t SetOption(ESockOptions, Int_t) { return 0; }
00231
00232
00233 void DisableTimeout() { fDontTimeout = kTRUE; }
00234 void EnableTimeout() { fDontTimeout = kFALSE; }
00235
00236
00237 virtual Int_t Reconnect();
00238
00239 ClassDef(TXSocket, 0)
00240 };
00241
00242
00243
00244
00245
00246 class TXSockBuf {
00247 public:
00248 Int_t fSiz;
00249 Int_t fLen;
00250 Char_t *fBuf;
00251 Bool_t fOwn;
00252 Int_t fCid;
00253
00254 TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1);
00255 ~TXSockBuf();
00256
00257 void Resize(Int_t sz);
00258
00259 static Long64_t BuffMem();
00260 static Long64_t GetMemMax();
00261 static void SetMemMax(Long64_t memmax);
00262
00263 private:
00264 Char_t *fMem;
00265 static Long64_t fgBuffMem;
00266 static Long64_t fgMemMax;
00267 };
00268
00269
00270
00271
00272 class TXSockPipe {
00273 public:
00274
00275 TXSockPipe(const char *loc = "");
00276 virtual ~TXSockPipe();
00277
00278 Bool_t IsValid() const { return ((fPipe[0] >= 0 && fPipe[1] >= 0) ? kTRUE : kFALSE); }
00279
00280 TXSocket *GetLastReady();
00281
00282 Int_t GetRead() const { return fPipe[0]; }
00283 Int_t Post(TSocket *s);
00284 Int_t Clean(TSocket *s);
00285 Int_t Flush(TSocket *s);
00286 void DumpReadySock();
00287
00288 void SetLoc(const char *loc = "") { fLoc = loc; }
00289
00290 private:
00291 TMutex fMutex;
00292 Int_t fPipe[2];
00293 TString fLoc;
00294 TList fReadySock;
00295 };
00296
00297
00298
00299
00300 class TXSemaphoreGuard {
00301 public:
00302
00303 TXSemaphoreGuard(TSemaphore *sem) : fSem(sem), fValid(kTRUE) { if (!fSem || fSem->TryWait()) fValid = kFALSE; }
00304 virtual ~TXSemaphoreGuard() { if (fValid && fSem) fSem->Post(); }
00305
00306 Bool_t IsValid() const { return fValid; }
00307
00308 private:
00309 TSemaphore *fSem;
00310 Bool_t fValid;
00311 };
00312
00313 #endif