XrdCmsClientMan.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                    X r d C m s C l i e n t M a n . c c                     */
00004 /*                                                                            */
00005 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010   
00011 //          $Id: XrdCmsClientMan.cc 38011 2011-02-08 18:35:57Z ganis $
00012 
00013 // Based on: XrdOdcManager.cc,v 1.13 2006/09/26 07:49:14 abh
00014 
00015 const char *XrdCmsClientManCVSID = "$Id: XrdCmsClientMan.cc 38011 2011-02-08 18:35:57Z ganis $";
00016 
00017 #include <time.h>
00018 
00019 #include "XrdCms/XrdCmsClientMan.hh"
00020 #include "XrdCms/XrdCmsClientMsg.hh"
00021 #include "XrdCms/XrdCmsLogin.hh"
00022 #include "XrdCms/XrdCmsTrace.hh"
00023 
00024 #include "XrdSys/XrdSysError.hh"
00025 #include "XrdSys/XrdSysTimer.hh"
00026 
00027 #include "Xrd/XrdInet.hh"
00028 #include "Xrd/XrdLink.hh"
00029 
00030 using namespace XrdCms;
00031  
00032 /******************************************************************************/
00033 /*                               G l o b a l s                                */
00034 /******************************************************************************/
00035   
00036 extern        XrdInet *XrdXrootdNetwork;
00037 
00038 XrdNetBufferQ XrdCmsClientMan::BuffQ(2048,64);
00039 
00040 char          XrdCmsClientMan::doDebug   = 0;
00041 
00042 char         *XrdCmsClientMan::ConfigFN  = 0;
00043 
00044 XrdSysMutex   XrdCmsClientMan::manMutex;
00045 
00046 /******************************************************************************/
00047 /*                           C o n s t r u c t o r                            */
00048 /******************************************************************************/
00049   
00050 XrdCmsClientMan::XrdCmsClientMan(char *host, int port, 
00051                                  int cw, int nr, int rw, int rd)
00052                 : syncResp(0)
00053 {
00054    static XrdSysMutex initMutex;
00055    static int Instance = 0;
00056    char  *dot;
00057 
00058    Host    = strdup(host);
00059    if ((dot = index(Host, '.')))
00060       {*dot = '\0'; HPfx = strdup(Host); *dot = '.';}
00061       else HPfx = strdup(Host);
00062    Port    = port;
00063    Link    = 0;
00064    Active  = 0;
00065    Silent  = 0;
00066    Suspend = 1;
00067    RecvCnt = 0;
00068    nrMax   = nr;
00069    NetBuff = BuffQ.Alloc();
00070    repWMax = rw;
00071    repWait = 0;
00072    minDelay= rd;
00073    maxDelay= rd*3;
00074    chkCount= chkVal;
00075    lastUpdt= lastTOut = time(0);
00076 
00077 // Compute dally value
00078 //
00079    dally = cw / 2 - 1;
00080    if (dally < 3) dally = 3;
00081       else if (dally > 10) dally = 10;
00082 
00083 // Provide a unique mask number for this manager
00084 //
00085    initMutex.Lock();
00086    manMask = 1<<Instance++;
00087    initMutex.UnLock();
00088 }
00089 
00090 /******************************************************************************/
00091 /*                            D e s t r u c t o r                             */
00092 /******************************************************************************/
00093 
00094 XrdCmsClientMan::~XrdCmsClientMan()
00095 {
00096   if (Link)    Link->Close();
00097   if (Host)    free(Host);
00098   if (HPfx)    free(HPfx);
00099   if (NetBuff) NetBuff->Recycle();
00100 }
00101   
00102 /******************************************************************************/
00103 /*                             d e l a y R e s p                              */
00104 /******************************************************************************/
00105   
00106 int XrdCmsClientMan::delayResp(XrdOucErrInfo &Resp)
00107 {
00108    XrdCmsResp *rp;
00109    int msgid;
00110 
00111 // Obtain the message ID
00112 //
00113    if (!(msgid = Resp.getErrInfo()))
00114       {Say.Emsg("Manager", Host, "supplied invalid waitr msgid");
00115        Resp.setErrInfo(0, "redirector protocol error");
00116        syncResp.Post();
00117        return -EINVAL;
00118       }
00119 
00120 // Allocate a delayed response object
00121 //
00122    if (!(rp = XrdCmsResp::Alloc(&Resp, msgid)))
00123       {Say.Emsg("Manager",ENOMEM,"allocate resp object for",Resp.getErrUser());
00124        Resp.setErrInfo(0, "0");
00125        syncResp.Post();
00126        return -EAGAIN;
00127       }
00128 
00129 // Add this object to our delayed response queue. If the manager bounced then
00130 // purge all of the pending repsonses to avoid sending wrong ones.
00131 //
00132    if (msgid < maxMsgID) RespQ.Purge();
00133    maxMsgID = msgid;
00134    RespQ.Add(rp);
00135 
00136 // Tell client to wait for response. The semaphore post allows the manager
00137 // to get the next message from the cmsd. This prevents us from getting the
00138 // delayed response before the response object is added to the queue.
00139 //
00140    Resp.setErrInfo(0, "");
00141    syncResp.Post();
00142    return -EINPROGRESS;
00143 }
00144 
00145 /******************************************************************************/
00146 /*                                  S e n d                                   */
00147 /******************************************************************************/
00148   
00149 int XrdCmsClientMan::Send(char *msg, int mlen)
00150 {
00151    int allok = 0;
00152 
00153 // Determine message length
00154 //
00155    if (!mlen) mlen = strlen(msg);
00156 
00157 // Send the request
00158 //
00159    if (Active)
00160       {myData.Lock();
00161        if (Link)
00162           {if (!(allok = Link->Send(msg, mlen) > 0))
00163               {Active = 0;
00164                Link->Close(1);
00165               } else SendCnt++;
00166           }
00167        myData.UnLock();
00168       }
00169 
00170 // All done
00171 //
00172    return allok;
00173 }
00174 
00175 /******************************************************************************/
00176   
00177 int XrdCmsClientMan::Send(const struct iovec *iov, int iovcnt, int iotot)
00178 {
00179    int allok = 0;
00180 
00181 // Send the request
00182 //
00183    if (Active)
00184       {myData.Lock();
00185        if (Link)
00186           {if (!(allok = Link->Send(iov, iovcnt, iotot) > 0))
00187               {Active = 0;
00188                Link->Close(1);
00189               } else SendCnt++;
00190           }
00191        myData.UnLock();
00192       }
00193 
00194 // All done
00195 //
00196    return allok;
00197 }
00198 
00199 /******************************************************************************/
00200 /*                                 S t a r t                                  */
00201 /******************************************************************************/
00202   
00203 void *XrdCmsClientMan::Start()
00204 {
00205 
00206 // First step is to connect to the manager
00207 //
00208    do {Hookup();
00209        // Now simply start receiving messages on the stream. When we get a
00210        // respwait reply then we must be assured that the object representing
00211        // the request is added to the queue before the actual reply arrives.
00212        // We do this by waiting on syncResp which is posted once the request
00213        // object is fully processed. The actual response associated with the
00214        // respwait is synchronized during the callback phase since the client
00215        // must receive the respwait before the subsequent response.
00216        //
00217        while(Receive())
00218                  if (Response.modifier & CmsResponse::kYR_async) relayResp();
00219             else if (Response.rrCode == kYR_status) setStatus();
00220             else if (XrdCmsClientMsg::Reply(HPfx, Response, NetBuff))
00221                     {if (Response.rrCode == kYR_waitresp) syncResp.Wait();}
00222 
00223        // Tear down the connection
00224        //
00225        myData.Lock();
00226        if (Link) {Link->Close(); Link = 0;}
00227        Active = 0; Suspend = 1;
00228        myData.UnLock();
00229 
00230        // Indicate the problem
00231        //
00232        Say.Emsg("ClientMan", "Disconnected from", Host);
00233        XrdSysTimer::Snooze(dally);
00234       } while(1);
00235 
00236 // We should never get here
00237 //
00238    return (void *)0;
00239 }
00240 
00241 /******************************************************************************/
00242 /*                               w h a t s U p                                */
00243 /******************************************************************************/
00244   
00245 int  XrdCmsClientMan::whatsUp(const char *user, const char *path)
00246 {
00247    EPNAME("whatsUp");
00248    int theDelay, inQ;
00249 
00250 // The cmsd did not respond. Increase silent count and see if restart is needed
00251 // Otherwise, increase the wait interval just in case things are just slow.
00252 //
00253    myData.Lock();
00254    if (Active)
00255       {if (Active == RecvCnt)
00256           {if ((time(0)-lastTOut) >= repWait)
00257               {Silent++;
00258                if (Silent > nrMax)
00259                   {Active = 0; Silent = 0; Suspend = 1;
00260                    if (Link) Link->Close(1);
00261                   } else if (Silent & 0x02 && repWait < repWMax) repWait++;
00262               }
00263           } else {Active = RecvCnt; Silent = 0; lastTOut = time(0);}
00264       }
00265 
00266 // Calclulate how long to delay the client. This will be based on the number
00267 // of outstanding requests bounded by the config delay value.
00268 //
00269    inQ = XrdCmsClientMsg::inQ();
00270    theDelay = inQ * qTime;
00271    myData.UnLock();
00272    theDelay = theDelay/1000 + (theDelay % 1000 ? 1 : 0);
00273    if (theDelay < minDelay) return minDelay;
00274    if (theDelay > maxDelay) return maxDelay;
00275 
00276 // Do Some tracing here
00277 //
00278    TRACE(Redirect, user <<" no resp from " <<HPfx  <<"; inQ " <<inQ <<" wait " <<theDelay <<" path=" <<path);
00279    return theDelay;
00280 }
00281 
00282 /******************************************************************************/
00283 /*                       P r i v a t e   M e t h o d s                        */
00284 /******************************************************************************/
00285 /******************************************************************************/
00286 /*                                H o o k u p                                 */
00287 /******************************************************************************/
00288   
00289 int XrdCmsClientMan::Hookup()
00290 {
00291    EPNAME("Hookup");
00292    CmsLoginData Data;
00293    XrdLink *lp;
00294    char buff[256];
00295    int rc, oldWait, tries = 12, opts = 0;
00296 
00297 // Turn off our debugging and version flags
00298 //
00299    manMutex.Lock();
00300    doDebug    &= ~manMask;
00301    manMutex.UnLock();
00302 
00303 // Keep trying to connect to the manager. Note that we bind the link to this
00304 // thread to make sure we get notified should another thread close the socket.
00305 //
00306    do {while(!(lp = XrdXrootdNetwork->Connect(Host, Port, opts)))
00307             {XrdSysTimer::Snooze(dally);
00308              if (tries--) opts = XRDNET_NOEMSG;
00309                 else     {opts = 0; tries = 12;}
00310              continue;
00311             }
00312        lp->Bind(XrdSysThread::ID());
00313        memset(&Data, 0, sizeof(Data));
00314        Data.Mode = CmsLoginData::kYR_director;
00315        Data.HoldTime = static_cast<int>(getpid());
00316        if (!(rc = XrdCmsLogin::Login(lp, Data))) break;
00317        lp->Close();
00318        XrdSysTimer::Snooze(dally);
00319       } while(1);
00320 
00321 // Establish global state
00322 //
00323    manMutex.Lock();
00324    doDebug |= (Data.Mode & CmsLoginData::kYR_debug ? manMask : 0);
00325    manMutex.UnLock();
00326 
00327 // All went well, finally
00328 //
00329    myData.Lock();
00330    Link     = lp;
00331    Active   = 1;
00332    Silent   = 0;
00333    RecvCnt  = 1;
00334    SendCnt  = 1;
00335    Suspend  = (Data.Mode & CmsLoginData::kYR_suspend);
00336 
00337 // Calculate how long we will wait for replies before delaying the client.
00338 // This is computed dynamically based on the expected response window.
00339 //
00340    if ((oldWait = (repWait*20/100)) < 2) oldWait = 2;
00341    if (Data.HoldTime > repWMax*1000) repWait = repWMax;
00342       else if (Data.HoldTime <= 0)   repWait = repWMax;
00343               else {repWait = Data.HoldTime*3;
00344                     repWait = (repWait/1000) + (repWait % 1000 ? 1 : 0);
00345                     if (repWait > repWMax) repWait = repWMax;
00346                        else if (repWait < oldWait) repWait = oldWait;
00347                    }
00348    qTime = (Data.HoldTime < 100 ? 100 : Data.HoldTime);
00349    lastTOut = time(0);
00350    myData.UnLock();
00351 
00352 // Tell the world
00353 //
00354    sprintf(buff, "v %d", Data.Version);
00355    Say.Emsg("ClientMan", (Suspend ? "Connected to suspended" : "Connected to"),
00356                          Host, buff);
00357    DEBUG(Host <<" qt=" <<qTime <<"ms rw=" <<repWait);
00358    return 1;
00359 }
00360 
00361 /******************************************************************************/
00362 /*                               R e c e i v e                                */
00363 /******************************************************************************/
00364   
00365 int XrdCmsClientMan::Receive()
00366 {
00367 // This method is always run out of the object's main thread. Other threads
00368 // may call methods that initiate a link reset via a deferred close. We will
00369 // notice that here because the file descriptor will be closed. This will
00370 // cause us to return an error and precipitate a connection teardown.
00371 //
00372    EPNAME("Receive")
00373    if (Link->RecvAll((char *)&Response, sizeof(Response)) > 0)
00374       {int dlen = static_cast<int>(ntohs(Response.datalen));
00375        RecvCnt++; NetBuff->dlen = dlen;
00376        DEBUG(Link->Name() <<' ' <<dlen <<" bytes on " <<Response.streamid);
00377        if (!dlen) return 1;
00378        if (dlen > NetBuff->BuffSize())
00379           Say.Emsg("ClientMan", "Excessive msg length from", Host);
00380           else return Link->RecvAll(NetBuff->data, dlen);
00381       }
00382    return 0;
00383 }
00384 
00385 /******************************************************************************/
00386 /*                             r e l a y R e s p                              */
00387 /******************************************************************************/
00388   
00389 void XrdCmsClientMan::relayResp()
00390 {
00391    EPNAME("relayResp");
00392    XrdCmsResp *rp;
00393 
00394 // Remove the response object from our queue.
00395 //
00396    if (!(rp = RespQ.Rem(Response.streamid)))
00397       {DEBUG(Host <<" replied to non-existent request; id=" <<Response.streamid);
00398        return;
00399       }
00400 
00401 // Queue the request for reply (this transfers the network buffer)
00402 //
00403    rp->Reply(HPfx, Response, NetBuff);
00404 
00405 // Obtain a new network buffer
00406 //
00407    NetBuff = BuffQ.Alloc();
00408 }
00409 
00410 /******************************************************************************/
00411 /*                             c h k S t a t u s                              */
00412 /******************************************************************************/
00413 
00414 void XrdCmsClientMan::chkStatus()
00415 {
00416    static CmsUpdateRequest Updt = {{0, kYR_update, 0, 0}};
00417    time_t nowTime;
00418 
00419 // Count down the query count and ask again every 30 seconds
00420 //
00421    myData.Lock();
00422    if (!chkCount--)
00423       {chkCount = chkVal;
00424        nowTime = time(0);
00425        if ((nowTime - lastUpdt) >= 30)
00426           {lastUpdt = nowTime;
00427            if (Active) Link->Send((char *)&Updt, sizeof(Updt));
00428           }
00429       }
00430    myData.UnLock();
00431 }
00432   
00433 /******************************************************************************/
00434 /*                             s e t S t a t u s                              */
00435 /******************************************************************************/
00436 
00437 void XrdCmsClientMan::setStatus()
00438 {
00439    EPNAME("setStatus");
00440    const char *State = 0, *Event = "?";
00441 
00442 
00443    myData.Lock();
00444    if (Response.modifier & CmsStatusRequest::kYR_Suspend)
00445       {Event = "suspend";
00446        if (!Suspend) {Suspend = 1; State = "suspended";}
00447       }
00448       else if (Response.modifier & CmsStatusRequest::kYR_Resume)
00449               {Event = "resume";
00450                if (Suspend) {Suspend = 0; State = "resumed";}
00451               }
00452    myData.UnLock();
00453 
00454    DEBUG(Host <<" sent " <<Event <<" event");
00455    if (State) Say.Emsg("setStatus", "Manager", Host, State);
00456 }

Generated on Tue Jul 5 14:46:23 2011 for ROOT_528-00b_version by  doxygen 1.5.1