XrdCmsRRQ.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                          X r d C m s R R Q . c c                           */
00004 /*                                                                            */
00005 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //         $Id: XrdCmsRRQ.cc 25932 2008-10-23 10:58:11Z ganis $
00012 
00013 // Original Version: 1.6 2007/07/31 02:25:16 abh
00014 
00015 const char *XrdCmsRRQCVSID = "$Id: XrdCmsRRQ.cc 25932 2008-10-23 10:58:11Z ganis $";
00016 
00017 #include <sys/types.h>
00018 #include <netinet/in.h>
00019 #include <inttypes.h>
00020 
00021 #include "XrdCms/XrdCmsCluster.hh"
00022 #include "XrdCms/XrdCmsNode.hh"
00023 #include "XrdCms/XrdCmsRRQ.hh"
00024 #include "XrdCms/XrdCmsRTable.hh"
00025 #include "XrdCms/XrdCmsTrace.hh"
00026 #include "XrdSys/XrdSysError.hh"
00027 #include "XrdSys/XrdSysTimer.hh"
00028 #include <stdio.h>
00029 
00030 using namespace XrdCms;
00031 
00032 // Note: Debugging statements have been commented out. This is time critical
00033 //       code and debugging may only be enabled in standalone testing as the
00034 //       delays introduced by DEBUG() will usually cause timeout failures.
00035   
00036 /******************************************************************************/
00037 /*       G l o b a l   O b j e c t s   &   S t a t i c   M e m b e r s        */
00038 /******************************************************************************/
00039   
00040 XrdCmsRRQ             XrdCms::RRQ;
00041 
00042 XrdSysMutex           XrdCmsRRQSlot::myMutex;
00043 XrdCmsRRQSlot        *XrdCmsRRQSlot::freeSlot = 0;
00044 short                 XrdCmsRRQSlot::initSlot = 0;
00045 
00046 /******************************************************************************/
00047 /*                    E x t e r n a l   F u n c t i o n s                     */
00048 /******************************************************************************/
00049   
00050 void *XrdCmsRRQ_StartTimeOut(void *parg) {return RRQ.TimeOut();}
00051 
00052 void *XrdCmsRRQ_StartRespond(void *parg) {return RRQ.Respond();}
00053 
00054 /******************************************************************************/
00055 /*               X r d C m s R R Q   C l a s s   M e t h o d s                */
00056 /******************************************************************************/
00057 /******************************************************************************/
00058 /*                                   A d d                                    */
00059 /******************************************************************************/
00060   
00061 short XrdCmsRRQ::Add(short Snum, XrdCmsRRQInfo *Info)
00062 {
00063 // EPNAME("RRQ Add");
00064    XrdCmsRRQSlot *sp;
00065 
00066 // Obtain a slot and fill it in
00067 //
00068    if (!(sp = XrdCmsRRQSlot::Alloc(Info))) return 0;
00069 // DEBUG("adding slot " <<sp->slotNum);
00070 
00071 // If a slot number given, check if it's the right slot and it is still queued.
00072 // If so, piggy-back this request to existing one and make a fast exit
00073 //
00074    myMutex.Lock();
00075    if (Snum && Slot[Snum].Info.Key == Info->Key && Slot[Snum].Expire)
00076       {if (Info->isLU)
00077           {sp->LkUp = Slot[Snum].LkUp;
00078            Slot[Snum].LkUp = sp;
00079           } else {
00080            sp->Cont = Slot[Snum].Cont;
00081            Slot[Snum].Cont = sp;
00082           }
00083        myMutex.UnLock();
00084        return Snum;
00085       }
00086 
00087 // Queue this slot to the pending response queue and tell the timeout scheduler
00088 //
00089    sp->Expire = myClock+1;
00090    if (waitQ.Singleton()) isWaiting.Post();
00091    waitQ.Prev()->Insert(&sp->Link);
00092    myMutex.UnLock();
00093    return sp->slotNum;
00094 }
00095 
00096 /******************************************************************************/
00097 /*                                   D e l                                    */
00098 /******************************************************************************/
00099   
00100 void XrdCmsRRQ::Del(short Snum, const void *Key)
00101 {
00102      Ready(Snum, Key, 0, 0);
00103 }
00104 
00105 /******************************************************************************/
00106 /*                                  I n i t                                   */
00107 /******************************************************************************/
00108   
00109 int XrdCmsRRQ::Init(int Tint, int Tdly)
00110 {
00111    int rc;
00112    pthread_t tid;
00113 
00114 // Set values
00115 //
00116    if (Tint) Tslice = Tint;
00117    if (Tdly) Tdelay = Tdly;
00118 
00119 // Fill out the response structure
00120 //
00121    dataResp.Hdr.streamid = 0;
00122    dataResp.Hdr.rrCode   = kYR_data;
00123    dataResp.Hdr.modifier = 0;
00124    dataResp.Hdr.datalen  = 0;
00125    dataResp.Val          = 0;
00126 
00127 // Fill out the data i/o vector
00128 //
00129    data_iov[0].iov_base = (char *)&dataResp;
00130    data_iov[0].iov_len  = sizeof(dataResp);
00131    data_iov[1].iov_base = databuff;;
00132 
00133 // Fill out the response structure
00134 //
00135    redrResp.Hdr.streamid = 0;
00136    redrResp.Hdr.rrCode   = kYR_redirect;
00137    redrResp.Hdr.modifier = 0;
00138    redrResp.Hdr.datalen  = 0;
00139    redrResp.Val          = 0;
00140 
00141 // Fill out the redirect i/o vector
00142 //
00143    redr_iov[0].iov_base = (char *)&redrResp;
00144    redr_iov[0].iov_len  = sizeof(redrResp);
00145    redr_iov[1].iov_base = hostbuff;;
00146 
00147 // Fill out the wait info
00148 //
00149    waitResp.Hdr.streamid = 0;
00150    waitResp.Hdr.rrCode   = kYR_wait;
00151    waitResp.Hdr.modifier = 0;
00152    waitResp.Hdr.datalen  = htons(static_cast<unsigned short>(sizeof(waitResp.Val)));
00153    waitResp.Val          = htonl(Tdelay);
00154 
00155 // Start the responder thread
00156 //
00157    if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartRespond, (void *)0,
00158                                0, "Request Responder")))
00159       {Say.Emsg("Config", rc, "create request responder thread");
00160        return 1;
00161       }
00162 
00163 // Start the timeout thread
00164 //
00165    if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartTimeOut, (void *)0,
00166                                0, "Request Timeout")))
00167       {Say.Emsg("Config", rc, "create request timeout thread");
00168        return 1;
00169       }
00170 
00171 // All done
00172 //
00173    return 0;
00174 }
00175 
00176 /******************************************************************************/
00177 /*                                 R e a d y                                  */
00178 /******************************************************************************/
00179   
00180 void XrdCmsRRQ::Ready(int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
00181 {
00182 // EPNAME("RRQ Ready");
00183    XrdCmsRRQSlot *sp;
00184 
00185 // Check if it's the right slot and it is still queued.
00186 //
00187    myMutex.Lock();
00188    sp = &Slot[Snum];
00189    if (sp->Info.Key != Key || !sp->Expire)
00190       {myMutex.UnLock();
00191 //     DEBUG("slot " <<Snum <<" no longer valid");
00192        return;
00193       }
00194 
00195 // Move the element from the waiting queue to the ready queue
00196 //
00197    sp->Link.Remove();
00198    if (readyQ.Singleton()) isReady.Post();
00199    sp->Arg1 = mask1; sp->Arg2 = mask2;
00200    readyQ.Prev()->Insert(&sp->Link);
00201    myMutex.UnLock();
00202 // DEBUG("readied slot " <<Snum <<" mask " <<mask);
00203 }
00204 
00205 /******************************************************************************/
00206 /*                               R e s p o n d                                */
00207 /******************************************************************************/
00208   
00209 void *XrdCmsRRQ::Respond()
00210 {
00211 // EPNAME("RRQ Respond");
00212    static const int ovhd = sizeof(kXR_unt32);
00213    XrdCmsRRQSlot *lupQ, *sp, *cp;
00214    int doredir, port, hlen;
00215 
00216 // In an endless loop, process all ready elements
00217 //
00218    do {isReady.Wait();     // DEBUG("responder awoken");
00219    do {myMutex.Lock();
00220        lupQ = 0;
00221        if (readyQ.Singleton()) {myMutex.UnLock(); break;}
00222        sp = readyQ.Next()->Item(); sp->Link.Remove(); sp->Expire = 0;
00223        myMutex.UnLock();
00224        if (sp->Info.isLU) {lupQ = sp;
00225                            if (!(sp = sp->Cont)) break;
00226                            sp->Arg1 = lupQ->Arg1; sp->Arg2 = lupQ->Arg2;
00227                           } else lupQ = sp->LkUp;
00228        if ((doredir = (sp->Arg1 && Cluster.Select(sp->Info.isRW, sp->Arg1,
00229                                                  port, hostbuff, hlen))))
00230           {redrResp.Val = htonl(port);
00231            redrResp.Hdr.datalen = htons(static_cast<unsigned short>(hlen+ovhd));
00232            redr_iov[1].iov_len  = hlen;
00233            hlen += ovhd + sizeof(redrResp.Hdr);
00234           }
00235        sendResponse(&sp->Info, doredir, hlen);
00236        cp = sp->Cont;
00237        while(cp) {sendResponse(&cp->Info, doredir, hlen); cp = cp->Cont;}
00238        sp->Recycle();
00239       } while(1);
00240        if (lupQ) {lupQ->Cont = lupQ->LkUp;
00241                   sendLocResp(lupQ);
00242                   lupQ->Recycle();
00243                  }
00244       } while(1);
00245 
00246 // Keep the compiler happy
00247 //
00248    return (void *)0;
00249 }
00250 
00251 /******************************************************************************/
00252 /*                           s e n d L o c R e s p                            */
00253 /******************************************************************************/
00254   
00255 void XrdCmsRRQ::sendLocResp(XrdCmsRRQSlot *lP)
00256 {
00257    static const int ovhd = sizeof(kXR_unt32);
00258    XrdCmsSelected *sP;
00259    XrdCmsNode *nP;
00260    int bytes;
00261 
00262 // Send a delay if we timed out
00263 //
00264    if (!(lP->Arg1))
00265       {do {sendResponse(&lP->Info, 0); lP = lP->Cont;} while(lP);
00266        return;
00267       }
00268 
00269 // Get the list of servers that have this file. If none found, then force the
00270 // client to wait as this should never happen and the long path is called for.
00271 //
00272    if (!(sP = Cluster.List(lP->Arg1, XrdCmsCluster::LS_IPV6))
00273    || (!(bytes = XrdCmsNode::do_LocFmt(databuff,sP,lP->Arg2,lP->Info.rwVec))))
00274       {while(lP) {sendResponse(&lP->Info, 0); lP = lP->Cont;}
00275        return;
00276       }
00277 
00278 // Complete the I/O vector
00279 //
00280    bytes++;
00281    data_iov[1].iov_len  = bytes;
00282    bytes += ovhd;
00283    dataResp.Hdr.datalen = htons(static_cast<unsigned short>(bytes));
00284    bytes += sizeof(dataResp.Hdr);
00285 
00286 // Send the reply to each waiting redirector
00287 //
00288    while(lP)
00289         {RTable.Lock();
00290          if ((nP = RTable.Find(lP->Info.Rnum, lP->Info.Rinst)))
00291             {dataResp.Hdr.streamid = lP->Info.ID;
00292              nP->Send(data_iov, iov_cnt, bytes);
00293             }
00294          RTable.UnLock();
00295          lP = lP->Cont;
00296         }
00297 }
00298 
00299 /******************************************************************************/
00300 /*                          s e n d R e s p o n s e                           */
00301 /******************************************************************************/
00302   
00303 void XrdCmsRRQ::sendResponse(XrdCmsRRQInfo *Info, int doredir, int totlen)
00304 {
00305 // EPNAME("sendResponse");
00306    XrdCmsNode *nP;
00307 
00308 // Find the redirector and send the message
00309 //
00310    RTable.Lock();
00311    if ((nP = RTable.Find(Info->Rnum, Info->Rinst)))
00312       {if (doredir){redrResp.Hdr.streamid = Info->ID;
00313                     nP->Send(redr_iov, iov_cnt, totlen);
00314 //                  DEBUG("Fast redirect " <<nP->Name() <<" -> " <<hostbuff);
00315                    }
00316               else {waitResp.Hdr.streamid = Info->ID;
00317                     nP->Send((char *)&waitResp, sizeof(waitResp));
00318 //                  DEBUG("Redirect delay " <<nP->Name() <<' ' <<Tdelay);
00319                    }
00320       } 
00321 //    else {DEBUG("redirector " <<Info->Rnum <<'.' <<Info->Rinst <<"not found");}
00322    RTable.UnLock();
00323 }
00324 
00325 /******************************************************************************/
00326 /*                               T i m e O u t                                */
00327 /******************************************************************************/
00328   
00329 void *XrdCmsRRQ::TimeOut()
00330 {
00331 // EPNAME("RRQ TimeOut");
00332    XrdCmsRRQSlot *sp;
00333 
00334 // We measure millisecond intervals to timeout waiting requests
00335 //
00336    while(1)
00337         {isWaiting.Wait();
00338          myMutex.Lock();
00339          while(1)
00340               {myClock++;
00341                myMutex.UnLock();
00342                XrdSysTimer::Wait(Tslice);
00343                myMutex.Lock();
00344                while((sp=waitQ.Next()->Item()) && sp->Expire < myClock)
00345                     {sp->Link.Remove();
00346                      if (readyQ.Singleton()) isReady.Post();
00347                      sp->Arg1 = 0; sp->Arg2 = 0;
00348 //                   DEBUG("expired slot " <<sp->slotNum);
00349                      readyQ.Prev()->Insert(&sp->Link);
00350                     }
00351                if (waitQ.Singleton()) break;
00352               }
00353          myMutex.UnLock();
00354         }
00355 
00356 // Keep the compiler happy
00357 //
00358    return (void *)0;
00359 }
00360 
00361 /******************************************************************************/
00362 /*           X r d C m s R R Q S l o t   C l a s s   M e t h o d s            */
00363 /******************************************************************************/
00364 /******************************************************************************/
00365 /*                           C o n s t r u c t o r                            */
00366 /******************************************************************************/
00367 
00368 XrdCmsRRQSlot::XrdCmsRRQSlot() : Link(this)
00369 {
00370 
00371    slotNum  = initSlot++;
00372    if (slotNum)
00373       {Cont     = freeSlot;
00374        freeSlot = this;
00375       } else Cont = 0;
00376    Arg1 = Arg2 = 0;
00377    Info.Key = 0;
00378 }
00379 
00380 /******************************************************************************/
00381 /*                                 A l l o c                                  */
00382 /******************************************************************************/
00383   
00384 XrdCmsRRQSlot *XrdCmsRRQSlot::Alloc(XrdCmsRRQInfo *theInfo)
00385 {
00386    XrdCmsRRQSlot *sp;
00387 
00388    myMutex.Lock();
00389    if ((sp = freeSlot))
00390       {sp->Info = *theInfo;
00391        freeSlot = sp->Cont;
00392        sp->Cont = 0;
00393        sp->LkUp = 0;
00394        sp->Arg1 = 0;
00395        sp->Arg2 = 0;
00396       }
00397    myMutex.UnLock();
00398    return sp;
00399 }
00400 
00401 /******************************************************************************/
00402 /*                               R e c y c l e                                */
00403 /******************************************************************************/
00404   
00405 void XrdCmsRRQSlot::Recycle()
00406 {
00407    XrdCmsRRQSlot *sp, *np = Cont;
00408 
00409    myMutex.Lock();
00410    if (!Link.Singleton()) Link.Remove();
00411    while((sp = np))
00412         {np           = sp->Cont;
00413          sp->Cont     = freeSlot;
00414          freeSlot     = sp;
00415          sp->Info.Key = 0;
00416         }
00417    Info.Key = 0;
00418    Cont     = freeSlot;
00419    freeSlot = this;
00420    myMutex.UnLock();
00421 }

Generated on Tue Jul 5 14:46:31 2011 for ROOT_528-00b_version by  doxygen 1.5.1