XrdClientConn.hh

Go to the documentation of this file.
00001 //////////////////////////////////////////////////////////////////////////
00002 //                                                                      //
00003 // XrdClientConn                                                        //
00004 //                                                                      //
00005 // Author: Fabrizio Furano (INFN Padova, 2004)                          //
00006 // Adapted from TXNetFile (root.cern.ch) originally done by             //
00007 //  Alvise Dorigo, Fabrizio Furano                                      //
00008 //          INFN Padova, 2003                                           //
00009 //                                                                      //
00010 // High level handler of connections to xrootd.                         //
00011 //                                                                      //
00012 //////////////////////////////////////////////////////////////////////////
00013 
00014 //       $Id: XrdClientConn.hh 30949 2009-11-02 16:37:58Z ganis $
00015 
00016 #ifndef XRD_CONN_H
00017 #define XRD_CONN_H
00018 
00019 
00020 #include "XrdClient/XrdClientConst.hh"
00021 
00022 #include "time.h"
00023 #include "XrdClient/XrdClientConnMgr.hh"
00024 #include "XrdClient/XrdClientMessage.hh"
00025 #include "XrdClient/XrdClientUrlInfo.hh"
00026 #include "XrdClient/XrdClientReadCache.hh"
00027 #include "XrdOuc/XrdOucHash.hh"
00028 
00029 #define ConnectionManager XrdClientConn::GetConnectionMgr()
00030 
00031 class XrdClientAbs;
00032 class XrdSecProtocol;
00033 
00034 class XrdClientConn {
00035 
00036 public:
00037 
00038     enum ESrvErrorHandlerRetval {
00039         kSEHRReturnMsgToCaller   = 0,
00040         kSEHRBreakLoop           = 1,
00041         kSEHRContinue            = 2,
00042         kSEHRReturnNoMsgToCaller = 3,
00043         kSEHRRedirLimitReached   = 4
00044     };
00045     enum EThreeStateReadHandler {
00046         kTSRHReturnMex     = 0,
00047         kTSRHReturnNullMex = 1,
00048         kTSRHContinue      = 2
00049     };
00050 
00051     // To keep info about an open session
00052     struct                     SessionIDInfo {
00053         char id[16];
00054     };
00055 
00056     int                        fLastDataBytesRecv;
00057     int                        fLastDataBytesSent;
00058     XErrorCode                 fOpenError;      
00059 
00060     XrdOucString               fRedirOpaque;        // Opaque info returned by the server when
00061 
00062     // redirecting. To be used in the next opens
00063     XrdClientConn();
00064     virtual ~XrdClientConn();
00065 
00066     inline bool                CacheWillFit(long long bytes) {
00067         if (!fMainReadCache)
00068             return FALSE;
00069         return fMainReadCache->WillFit(bytes);
00070     }
00071 
00072     bool                       CheckHostDomain(XrdOucString hostToCheck);
00073     short                      Connect(XrdClientUrlInfo Host2Conn,
00074                                        XrdClientAbsUnsolMsgHandler *unsolhandler);
00075     void                       Disconnect(bool ForcePhysicalDisc);
00076     virtual bool               GetAccessToSrv();
00077     XReqErrorType              GoBackToRedirector();
00078 
00079     XrdOucString               GetClientHostDomain() { return fgClientHostDomain; }
00080 
00081 
00082     static XrdClientPhyConnection     *GetPhyConn(int LogConnID);
00083 
00084 
00085     // --------- Cache related stuff
00086 
00087     long                       GetDataFromCache(const void *buffer,
00088                                                 long long begin_offs,
00089                                                 long long end_offs,
00090                                                 bool PerfCalc,
00091                                                 XrdClientIntvList &missingblks,
00092                                                 long &outstandingblks );
00093 
00094     bool                       SubmitDataToCache(XrdClientMessage *xmsg,
00095                                                  long long begin_offs,
00096                                                  long long end_offs);
00097 
00098     bool                       SubmitRawDataToCache(const void *buffer,
00099                                                  long long begin_offs,
00100                                                  long long end_offs);
00101 
00102     void                       SubmitPlaceholderToCache(long long begin_offs,
00103                                                         long long end_offs) {
00104         if (fMainReadCache)
00105             fMainReadCache->PutPlaceholder(begin_offs, end_offs);
00106     }
00107 
00108   
00109     void                       RemoveAllDataFromCache(bool keepwriteblocks=true) {
00110         if (fMainReadCache)
00111             fMainReadCache->RemoveItems(keepwriteblocks);
00112     }
00113 
00114     void                       RemoveDataFromCache(long long begin_offs,
00115                                                    long long end_offs, bool remove_overlapped = false) {
00116         if (fMainReadCache)
00117           fMainReadCache->RemoveItems(begin_offs, end_offs, remove_overlapped);
00118     }
00119 
00120     void                       RemovePlaceholdersFromCache() {
00121         if (fMainReadCache)
00122             fMainReadCache->RemovePlaceholders();
00123     }
00124 
00125     void                       PrintCache() {
00126         if (fMainReadCache)
00127             fMainReadCache->PrintCache();
00128     }
00129 
00130 
00131   bool                       GetCacheInfo(
00132                                           // The actual cache size
00133                                           int &size,
00134 
00135                                           // The number of bytes submitted since the beginning
00136                                           long long &bytessubmitted,
00137 
00138                                           // The number of bytes found in the cache (estimate)
00139                                           long long &byteshit,
00140 
00141                                           // The number of reads which did not find their data
00142                                           // (estimate)
00143                                           long long &misscount,
00144 
00145                                           // miss/totalreads ratio (estimate)
00146                                           float &missrate,
00147 
00148                                           // number of read requests towards the cache
00149                                           long long &readreqcnt,
00150 
00151                                           // ratio between bytes found / bytes submitted
00152                                           float &bytesusefulness
00153                                           ) {
00154       if (!fMainReadCache) return false;
00155 
00156       fMainReadCache->GetInfo(size,
00157                               bytessubmitted,
00158                               byteshit,
00159                               misscount,
00160                               missrate,
00161                               readreqcnt,
00162                               bytesusefulness);
00163       return true;
00164     }  
00165                                           
00166 
00167     void                       SetCacheSize(int CacheSize) {
00168         if (!fMainReadCache && CacheSize)
00169           fMainReadCache = new XrdClientReadCache();
00170 
00171         if (fMainReadCache)
00172            fMainReadCache->SetSize(CacheSize);
00173     }
00174 
00175     void                       SetCacheRmPolicy(int RmPolicy) {
00176         if (fMainReadCache)
00177            fMainReadCache->SetBlkRemovalPolicy(RmPolicy);
00178     }
00179 
00180     void                       UnPinCacheBlk(long long begin_offs, long long end_offs) {
00181         fMainReadCache->UnPinCacheBlk(begin_offs, end_offs);
00182         // Also use this to signal the possibility to proceed for a hard checkpoint
00183         fWriteWaitAck->Broadcast();
00184     }
00185 
00186 
00187     // -------------------
00188 
00189 
00190     int                        GetLogConnID() const { return fLogConnID; }
00191 
00192     ERemoteServerType          GetServerType() const { return fServerType; }
00193 
00194     kXR_unt16                  GetStreamID() const { return fPrimaryStreamid; }
00195 
00196     inline XrdClientUrlInfo    *GetLBSUrl() { return fLBSUrl; }
00197     inline XrdClientUrlInfo    GetCurrentUrl() { return fUrl; }
00198     inline XrdClientUrlInfo    GetRedirUrl() { return fREQUrl; }
00199 
00200     XErrorCode                 GetOpenError() const { return fOpenError; }
00201     virtual XReqErrorType      GoToAnotherServer(XrdClientUrlInfo &newdest);
00202     bool                       IsConnected() const { return fConnected; }
00203     bool                       IsPhyConnConnected();
00204 
00205     struct ServerResponseHeader
00206                                LastServerResp;
00207 
00208     struct ServerResponseBody_Error
00209                                LastServerError;
00210 
00211     void                       ClearLastServerError() {
00212                                    memset(&LastServerError, 0, sizeof(LastServerError));
00213                                    LastServerError.errnum = kXR_noErrorYet;
00214                                }
00215 
00216     UnsolRespProcResult        ProcessAsynResp(XrdClientMessage *unsolmsg);
00217 
00218     virtual bool               SendGenCommand(ClientRequest *req, 
00219                                               const void *reqMoreData,       
00220                                               void **answMoreDataAllocated,
00221                                               void *answMoreData, bool HasToAlloc,
00222                                               char *CmdName, int substreamid = 0);
00223 
00224     int                        GetOpenSockFD() const { return fOpenSockFD; }
00225 
00226     void                       SetClientHostDomain(const char *src) { fgClientHostDomain = src; }
00227     void                       SetConnected(bool conn) { fConnected = conn; }
00228 
00229     void                       SetOpenError(XErrorCode err) { fOpenError = err; }
00230 
00231     // Gets a parallel stream id to use to set the return path for a re
00232     int                        GetParallelStreamToUse(int reqsperstream);
00233     int                        GetParallelStreamCount();     // Returns the total number of connected streams
00234 
00235     void                       SetRedirHandler(XrdClientAbs *rh) { fRedirHandler = rh; }
00236 
00237     void                       SetRequestedDestHost(char *newh, kXR_int32 port) {
00238         fREQUrl = fUrl;
00239         fREQUrl.Host = newh;
00240         fREQUrl.Port = port;
00241         fREQUrl.SetAddrFromHost();
00242     }
00243 
00244     // Puts this instance in pause state for wsec seconds.
00245     // A value <= 0 revokes immediately the pause state
00246     void                       SetREQPauseState(kXR_int32 wsec) {
00247         // Lock mutex
00248         fREQWait->Lock();
00249 
00250         if (wsec > 0)
00251             fREQWaitTimeLimit = time(0) + wsec;
00252         else {
00253             fREQWaitTimeLimit = 0;
00254             fREQWait->Broadcast();
00255         }
00256 
00257         // UnLock mutex
00258         fREQWait->UnLock();
00259     }
00260 
00261     // Puts this instance in connect-pause state for wsec seconds.
00262     // Any future connection attempt will not happen before wsec
00263     //  and the first one will be towards the given host
00264     void                       SetREQDelayedConnectState(kXR_int32 wsec) {
00265         // Lock mutex
00266         fREQConnectWait->Lock();
00267 
00268         if (wsec > 0)
00269             fREQConnectWaitTimeLimit = time(0) + wsec;
00270         else {
00271             fREQConnectWaitTimeLimit = 0;
00272             fREQConnectWait->Broadcast();
00273         }
00274 
00275         // UnLock mutex
00276         fREQConnectWait->UnLock();
00277     }
00278 
00279     void                       SetSID(kXR_char *sid);
00280     inline void                SetUrl(XrdClientUrlInfo thisUrl) { fUrl = thisUrl; }
00281 
00282     // Sends the request to the server, through logconn with ID LogConnID
00283     // The request is sent with a streamid 'child' of the current one, then marked as pending
00284     // Its answer will be caught asynchronously
00285     XReqErrorType              WriteToServer_Async(ClientRequest *req, 
00286                                                    const void* reqMoreData,
00287                                                    int substreamid = 0);
00288 
00289     static XrdClientConnectionMgr *GetConnectionMgr()
00290     { return fgConnectionMgr;} //Instance of the conn manager
00291 
00292     void GetSessionID(SessionIDInfo &sess) {
00293       XrdOucString sessname;
00294       char buf[20];
00295       
00296       snprintf(buf, 20, "%d", fUrl.Port);
00297 
00298       sessname = fUrl.HostAddr;
00299       if (sessname.length() <= 0)
00300         sessname = fUrl.Host;
00301 
00302       sessname += ":";
00303       sessname += buf;
00304 
00305       sess = *( fSessionIDRepo.Find(sessname.c_str()) );
00306     }
00307 
00308     long                       GetServerProtocol() { return fServerProto; }
00309 
00310     short                      GetMaxRedirCnt() const { return fMaxGlobalRedirCnt; }
00311     void                       SetMaxRedirCnt(short mx) {fMaxGlobalRedirCnt = mx; }
00312     short                      GetRedirCnt() const { return fGlobalRedirCnt; }
00313 
00314     bool                       DoWriteSoftCheckPoint();
00315     bool                       DoWriteHardCheckPoint();
00316     void                       UnPinCacheBlk();
00317 
00318 
00319     // To give a max number of seconds for an operation to complete, no matter what happens inside
00320     // e.g. redirections, sleeps, failed connection attempts etc.
00321     void                       SetOpTimeLimit(int delta_secs);
00322     bool                       IsOpTimeLimitElapsed(time_t timenow);
00323 
00324 
00325 protected:
00326     void                       SetLogConnID(int cid) { fLogConnID = cid; }
00327     void                       SetStreamID(kXR_unt16 sid) { fPrimaryStreamid = sid; }
00328 
00329 
00330 
00331     // The handler which first tried to connect somewhere
00332     XrdClientAbsUnsolMsgHandler *fUnsolMsgHandler;
00333 
00334     XrdClientUrlInfo           fUrl;                // The current URL
00335     XrdClientUrlInfo           *fLBSUrl;            // Needed to save the load balancer url
00336     XrdClientUrlInfo           fREQUrl;             // For explicitly requested redirs
00337 
00338     short                      fGlobalRedirCnt;    // Number of redirections
00339 
00340 private:
00341 
00342     static XrdOucString        fgClientHostDomain; // Save the client's domain name
00343     bool                       fConnected;
00344     bool                       fGettingAccessToSrv; // To avoid recursion in desperate situations
00345     time_t                     fGlobalRedirLastUpdateTimestamp; // Timestamp of last redirection
00346 
00347     int                        fLogConnID;        // Logical connection ID used
00348     kXR_unt16                  fPrimaryStreamid;  // Streamid used for normal communication
00349     // NB it's a copy of the one contained in
00350     // the logconn
00351 
00352     short                      fMaxGlobalRedirCnt;
00353     XrdClientReadCache         *fMainReadCache;
00354 
00355     // The time limit for a transaction
00356     time_t                     fOpTimeLimit;
00357 
00358     XrdClientAbs               *fRedirHandler;      // Pointer to a class inheriting from
00359     // XrdClientAbs providing methods
00360     // to handle a redir at higher level
00361 
00362     XrdOucString               fRedirInternalToken; // Token returned by the server when
00363     // redirecting. To be used in the next logins
00364 
00365     XrdSysCondVar              *fREQWaitResp;           // For explicitly requested delayed async responses
00366     ServerResponseBody_Attn_asynresp *
00367                                fREQWaitRespData;        // For explicitly requested delayed async responses
00368 
00369     time_t                     fREQWaitTimeLimit;   // For explicitly requested pause state
00370     XrdSysCondVar              *fREQWait;           // For explicitly requested pause state
00371     time_t                     fREQConnectWaitTimeLimit;   // For explicitly requested delayed reconnect
00372     XrdSysCondVar              *fREQConnectWait;           // For explicitly requested delayed reconnect
00373 
00374     long                       fServerProto;        // The server protocol
00375     ERemoteServerType          fServerType;         // Server type as returned by doHandShake() 
00376 
00377     static XrdOucHash<SessionIDInfo>
00378     fSessionIDRepo;      // The repository of session IDs, shared.
00379     // Association between
00380     // <hostname>:<port> and a SessionIDInfo struct
00381 
00382     int                        fOpenSockFD;         // Descriptor of the underlying socket
00383     static XrdClientConnectionMgr *fgConnectionMgr; //Instance of the Connection Manager
00384 
00385     XrdSysCondVar              *fWriteWaitAck;
00386     XrdClientVector<ClientRequest> fWriteReqsToRetry; // To store the write reqs to retry in case of a disconnection
00387 
00388     bool                       CheckErrorStatus(XrdClientMessage *, short &, char *);
00389     void                       CheckPort(int &port);
00390     void                       CheckREQPauseState();
00391     void                       CheckREQConnectWaitState();
00392     bool                       CheckResp(struct ServerResponseHeader *resp, const char *method);
00393     XrdClientMessage           *ClientServerCmd(ClientRequest *req,
00394                                                 const void *reqMoreData,
00395                                                 void **answMoreDataAllocated,
00396                                                 void *answMoreData,
00397                                                 bool HasToAlloc,
00398                                                 int substreamid = 0);
00399     XrdSecProtocol            *DoAuthentication(char *plist, int plsiz);
00400 
00401     ERemoteServerType          DoHandShake(short log);
00402 
00403     bool                       DoLogin();
00404     bool                       DomainMatcher(XrdOucString dom, XrdOucString domlist);
00405 
00406     XrdOucString               GetDomainToMatch(XrdOucString hostname);
00407 
00408     ESrvErrorHandlerRetval     HandleServerError(XReqErrorType &, XrdClientMessage *,
00409                                                  ClientRequest *);
00410     bool                       MatchStreamid(struct ServerResponseHeader *ServerResponse);
00411 
00412     // Sends a close request, without waiting for an answer
00413     // useful (?) to be sent just before closing a badly working stream
00414     bool                       PanicClose();
00415 
00416     XrdOucString               ParseDomainFromHostname(XrdOucString hostname);
00417 
00418     XrdClientMessage           *ReadPartialAnswer(XReqErrorType &, size_t &, 
00419                                                   ClientRequest *, bool, void**,
00420                                                   EThreeStateReadHandler &);
00421 
00422     void                       ClearSessionID();
00423 
00424     XReqErrorType              WriteToServer(ClientRequest *req, 
00425                                              const void* reqMoreData,
00426                                              short LogConnID,
00427                                              int substreamid = 0);
00428 
00429     bool                       WaitResp(int secsmax);
00430 };
00431 
00432 
00433 
00434 #endif

Generated on Tue Jul 5 14:46:18 2011 for ROOT_528-00b_version by  doxygen 1.5.1