XrdCmsAdmin.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                        X r d C m s A d m i n . c c                         */
00004 /*                                                                            */
00005 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010   
00011 //         $Id: XrdCmsAdmin.cc 34000 2010-06-21 06:49:56Z ganis $
00012 
00013 // Original Version: 1.20 2007/07/31 02:25:11 abh
00014 
00015 const char *XrdCmsAdminCVSID = "$Id: XrdCmsAdmin.cc 34000 2010-06-21 06:49:56Z ganis $";
00016 
00017 #include <stdio.h>
00018 #include <limits.h>
00019 #include <unistd.h>
00020 #include <sys/types.h>
00021 
00022 #include "XProtocol/YProtocol.hh"
00023 
00024 #include "XrdCms/XrdCmsAdmin.hh"
00025 #include "XrdCms/XrdCmsConfig.hh"
00026 #include "XrdCms/XrdCmsManager.hh"
00027 #include "XrdCms/XrdCmsPrepare.hh"
00028 #include "XrdCms/XrdCmsState.hh"
00029 #include "XrdCms/XrdCmsTrace.hh"
00030 #include "XrdNet/XrdNetSocket.hh"
00031 #include "XrdOuc/XrdOuca2x.hh"
00032 #include "XrdOuc/XrdOucName2Name.hh"
00033 #include "XrdSys/XrdSysError.hh"
00034 #include "XrdSys/XrdSysPlatform.hh"
00035 
00036 using namespace XrdCms;
00037  
00038 /******************************************************************************/
00039 /*                         L o c a l   C l a s s e s                          */
00040 /******************************************************************************/
00041   
00042 namespace XrdCms
00043 {
00044 class AdminReq
00045 {
00046 public:
00047 
00048        AdminReq *Next;
00049 const  char     *Req;
00050 const  char     *Path;
00051        CmsRRHdr  Hdr;
00052        char     *Data;
00053        int       Dlen;
00054 static int       numinQ;
00055 static const int maxinQ = 1024;
00056 
00057 static AdminReq *getReq() {AdminReq *arP;
00058                            do {QPresent.Wait();
00059                                QMutex.Lock();
00060                                if ((arP = First))
00061                                   {if (!(First = arP->Next)) Last = 0;
00062                                    numinQ--;
00063                                   }
00064                                QMutex.UnLock();
00065                               } while (!arP);
00066                            return arP;
00067                           }
00068 
00069        void      Requeue() {QMutex.Lock();
00070                             Next=First; First=this; QPresent.Post(); numinQ++;
00071                             QMutex.UnLock();
00072                            }
00073 
00074           AdminReq(const char *req, XrdCmsRRData &RRD) 
00075                   : Next(0), Req(req), Path(RRD.Path ? RRD.Path : ""),
00076                     Hdr(RRD.Request), Data(RRD.Buff), Dlen(RRD.Dlen)
00077                   {RRD.Buff = 0;
00078                    QMutex.Lock();
00079                    if (Last) {Last->Next = this; Last = this;}
00080                       else    First=Last = this;
00081                    QPresent.Post();
00082                    numinQ++;
00083                    QMutex.UnLock();
00084                   }
00085 
00086      ~AdminReq() {if (Data) free(Data);}
00087 
00088 private:
00089 
00090 static XrdSysSemaphore QPresent;
00091 static XrdSysMutex     QMutex;
00092 static AdminReq       *First;
00093 static AdminReq       *Last;
00094 };
00095 };
00096 
00097 /******************************************************************************/
00098 /*                     G l o b a l s   &   S t a t i c s                      */
00099 /******************************************************************************/
00100 
00101        XrdSysSemaphore  AdminReq::QPresent(0);
00102        XrdSysMutex      AdminReq::QMutex;
00103        AdminReq        *AdminReq::First = 0;
00104        AdminReq        *AdminReq::Last  = 0;
00105        int              AdminReq::numinQ= 0;
00106 
00107        XrdSysMutex      XrdCmsAdmin::myMutex;
00108        XrdSysSemaphore *XrdCmsAdmin::SyncUp = 0;
00109        int              XrdCmsAdmin::POnline= 0;
00110 
00111 /******************************************************************************/
00112 /*            E x t e r n a l   T h r e a d   I n t e r f a c e s             */
00113 /******************************************************************************/
00114   
00115 void *XrdCmsAdminLogin(void *carg)
00116       {XrdCmsAdmin *Admin = new XrdCmsAdmin();
00117        Admin->Login(*(int *)carg);
00118        delete Admin;
00119        return (void *)0;
00120       }
00121 
00122 void *XrdCmsAdminSend(void *carg)
00123       {XrdCmsAdmin::Relay(0,0);
00124        return (void *)0;
00125       }
00126  
00127 /******************************************************************************/
00128 /*                                 L o g i n                                  */
00129 /******************************************************************************/
00130   
00131 void XrdCmsAdmin::Login(int socknum)
00132 {
00133    const char *epname = "Admin_Login";
00134    char *request, *tp;
00135 
00136 // Attach the socket FD to a stream
00137 //
00138    Stream.Attach(socknum);
00139 
00140 // The first request better be "login"
00141 //
00142    if ((request = Stream.GetLine()))
00143       {DEBUG("initial request: '" <<request <<"'");
00144        if (!(tp = Stream.GetToken()) || strcmp("login", tp) || !do_Login())
00145           {Say.Emsg(epname, "Invalid admin login sequence");
00146            return;
00147           }
00148        } else {Say.Emsg(epname, "No admin login specified");
00149                return;
00150               }
00151 
00152 // Start receiving requests on this stream
00153 //
00154    while((request = Stream.GetLine()))
00155         {DEBUG("received request: '" <<request <<"'");
00156          if ((tp = Stream.GetToken()))
00157             {     if (!strcmp("resume",   tp))
00158                       CmsState.Update(XrdCmsState::Active, 1);
00159              else if (!strcmp("rmdid",    tp)) do_RmDid();   // via lfn
00160              else if (!strcmp("newfn",    tp)) do_RmDud();   // via lfn
00161              else if (!strcmp("suspend",  tp)) 
00162                      {CmsState.Update(XrdCmsState::Active, 0);
00163                       Say.Emsg("Notes","suspend requested by",Stype,Sname);
00164                      }
00165              else Say.Emsg(epname, "invalid admin request,", tp);
00166             }
00167         }
00168 
00169 // The socket disconnected
00170 //
00171    Say.Emsg("Login", Stype, Sname, "logged out");
00172 
00173 // If this is a primary, we must suspend but do not record this event!
00174 //
00175    if (Primary) 
00176       {CmsState.Update(XrdCmsState::FrontEnd, 0, -1);
00177        myMutex.Lock();
00178        POnline = 0;
00179        Relay(1,-1);
00180        myMutex.UnLock();
00181       }
00182    return;
00183 }
00184 
00185 /******************************************************************************/
00186 /*                                 N o t e s                                  */
00187 /******************************************************************************/
00188   
00189 void *XrdCmsAdmin::Notes(XrdNetSocket *AnoteSock)
00190 {
00191    const char *epname = "Notes";
00192    char *request, *tp;
00193    int rc;
00194 
00195 // Bind the udp socket to a stream
00196 //
00197    Stream.Attach(AnoteSock->Detach());
00198    Sname = strdup("anon");
00199 
00200 // Accept notifications in an endless loop
00201 //
00202    do {while((request = Stream.GetLine()))
00203             {DEBUG("received notification: '" <<request <<"'");
00204              if ((tp = Stream.GetToken()))
00205                 {     if (!strcmp("gone",    tp)) do_RmDid(1); // via pfn
00206                  else if (!strcmp("rmdid",   tp)) do_RmDid(0); // via lfn
00207                  else if (!strcmp("have",    tp)) do_RmDud(1); // via pfn
00208                  else if (!strcmp("newfn",   tp)) do_RmDud(0); // via lfn
00209                  else if (!strcmp("nostage", tp))
00210                          {CmsState.Update(XrdCmsState::Stage, 0);
00211                           Say.Emsg("Notes","nostage requested by",Stype,Sname);
00212                          }
00213                  else if (!strcmp("stage",   tp))
00214                           CmsState.Update(XrdCmsState::Stage, 1);
00215                  else Say.Emsg(epname, "invalid notification,", tp);
00216                 }
00217             }
00218        if ((rc = Stream.LastError())) break;
00219        rc = Stream.Detach(); Stream.Attach(rc);
00220       } while(1);
00221 
00222 // We should never get here
00223 //
00224    Say.Emsg(epname, rc, "accept notification");
00225    return (void *)0;
00226 }
00227 
00228 /******************************************************************************/
00229 /*                                 R e l a y                                  */
00230 /******************************************************************************/
00231   
00232 void XrdCmsAdmin::Relay(int setSock, int newSock)
00233 {
00234    const char            *epname = "Admin_Relay";
00235    static const int       HdrSz = sizeof(CmsRRHdr);
00236    static XrdSysMutex     SMutex;
00237    static XrdSysSemaphore SReady(0);
00238    static int             curSock = -1;
00239    AdminReq              *arP;
00240    int                    retc, mySock = -1;
00241 
00242 // Set socket for writing (called from admin thread when pimary logs on)
00243    if (setSock)
00244       {SMutex.Lock();
00245        if (curSock >= 0) close(curSock);
00246           else if (newSock >= 0) SReady.Post();
00247        if (newSock < 0) curSock = -1;
00248           else {curSock = dup(newSock); XrdNetSocket::setOpts(curSock, 0);}
00249        SMutex.UnLock();
00250        return;
00251       }
00252 
00253 // This is just an endless loop
00254 //
00255    do {while(mySock < 0)
00256             {SMutex.Lock();
00257              if (curSock < 0) {SMutex.UnLock(); SReady.Wait(); SMutex.Lock();}
00258              mySock = curSock; curSock = -1;
00259              SMutex.UnLock();
00260             }
00261 
00262        do {arP = AdminReq::getReq();
00263 
00264            if ((retc = write(mySock, &arP->Hdr, HdrSz))     != HdrSz
00265            ||  (retc = write(mySock, arP->Data, arP->Dlen)) != arP->Dlen)
00266               retc = (retc < 0 ? errno : ECANCELED);
00267               else {DEBUG("sent " <<arP->Req <<' ' <<arP->Path);
00268                     delete arP; retc = 0;
00269                    }
00270           } while(retc == 0);
00271 
00272        if (retc) Say.Emsg("AdminRelay", retc, "relay", arP->Req);
00273        arP->Requeue();
00274        close(mySock);
00275        mySock = -1;
00276       } while(1);
00277 }
00278 
00279 /******************************************************************************/
00280 /*                                  S e n d                                   */
00281 /******************************************************************************/
00282   
00283 void XrdCmsAdmin::Send(const char *Req, XrdCmsRRData &Data)
00284 {
00285    AdminReq *arP;
00286 
00287    if (AdminReq::numinQ < AdminReq::maxinQ) arP = new AdminReq(Req, Data);
00288       else Say.Emsg("Send", "Queue full; ignoring", Req, Data.Path);
00289 }
00290 
00291 /******************************************************************************/
00292 /*                                 S t a r t                                  */
00293 /******************************************************************************/
00294   
00295 void *XrdCmsAdmin::Start(XrdNetSocket *AdminSock)
00296 {
00297    const char *epname = "Start";
00298    int InSock;
00299    pthread_t tid;
00300 
00301 // Start the relay thread
00302 //
00303    if (XrdSysThread::Run(&tid,XrdCmsAdminSend,(void *)0))
00304       Say.Emsg(epname, errno, "start admin relay");
00305 
00306 // If we are in independent mode then let the caller continue
00307 //
00308    if (Config.doWait)
00309       {Say.Emsg(epname, "Waiting for primary server to login.");}
00310        else if (SyncUp) {SyncUp->Post(); SyncUp = 0;}
00311 
00312 // Accept connections in an endless loop
00313 //
00314    while(1) if ((InSock = AdminSock->Accept()) >= 0)
00315                {XrdNetSocket::setOpts(InSock, 0);
00316                 if (XrdSysThread::Run(&tid,XrdCmsAdminLogin,(void *)&InSock))
00317                    {Say.Emsg(epname, errno, "start admin");
00318                     close(InSock);
00319                    }
00320                } else Say.Emsg(epname, errno, "accept connection");
00321    return (void *)0;
00322 }
00323 
00324 /******************************************************************************/
00325 /*                       P r i v a t e   M e t h o d s                        */
00326 /******************************************************************************/
00327 /******************************************************************************/
00328 /*                              d o _ L o g i n                               */
00329 /******************************************************************************/
00330   
00331 int XrdCmsAdmin::do_Login()
00332 {
00333    const char *emsg;
00334    char buff[64], *tp, Ltype = 0;
00335    int Port = 0;
00336 
00337 // Process: login {p | P | s | u} <name> [port <port>]
00338 //
00339    if (!(tp = Stream.GetToken()))
00340       {Say.Emsg("do_Login", "login type not specified");
00341        return 0;
00342       }
00343 
00344    Ltype = *tp;
00345    if (*(tp+1) == '\0')
00346       switch (*tp)
00347              {case 'p': Stype = "Primary server"; break;
00348               case 'P': Stype = "Proxy server";   break;
00349               case 's': Stype = "Server";         break;
00350               case 'u': Stype = "Admin";          break;
00351               default:  Ltype = 0;                break;
00352              }
00353 
00354    if (!Ltype)
00355       {Say.Emsg("do_Login", "Invalid login type,", tp);
00356        return 0;
00357       } else Ltype = *tp;
00358 
00359    if (!(tp = Stream.GetToken()))
00360       {Say.Emsg("do_Login", "login name not specified");
00361        return 0;
00362       } else Sname = strdup(tp);
00363 
00364 // Get any additional options
00365 //
00366    while((tp = Stream.GetToken()))
00367         {     if (!strcmp(tp, "port"))
00368                  {if (!(tp = Stream.GetToken()))
00369                      {Say.Emsg("do_Login", "login port not specified");
00370                       return 0;
00371                      }
00372                   if (XrdOuca2x::a2i(Say,"login port",tp,&Port,0))
00373                      return 0;
00374                  }
00375          else    {Say.Emsg("do_Login", "invalid login option -", tp);
00376                   return 0;
00377                  }
00378         }
00379 
00380 // If this is not a primary, we are done. Otherwise there is much more. We
00381 // must make sure we are compatible with the login
00382 //
00383    if (Ltype != 'p' && Ltype != 'P') return 1;
00384         if (Ltype == 'p' &&  Config.asProxy()) emsg = "only accepts proxies";
00385    else if (Ltype == 'P' && !Config.asProxy()) emsg = "does not accept proxies";
00386    else                                        emsg = 0;
00387    if (emsg) 
00388       {Say.Emsg("do_login", "Server login rejected; configured role", emsg);
00389        return 0;
00390       }
00391 
00392 // Discard login if this is a duplicate primary server
00393 //
00394    myMutex.Lock();
00395    if (POnline)
00396       {myMutex.UnLock();
00397        Say.Emsg("do_Login", "Primary server already logged in; login of", 
00398                                    tp, "rejected.");
00399        return 0;
00400       }
00401 
00402 // Indicate we have a primary
00403 //
00404    Primary = 1;
00405    POnline = 1;
00406    Relay(1, Stream.FDNum());
00407    CmsState.Update(XrdCmsState::FrontEnd, 1, Port);
00408 
00409 // Check if this is the first primary login and resume if we must
00410 //
00411    if (SyncUp) {SyncUp->Post(); SyncUp = 0;}
00412    myMutex.UnLock();
00413 
00414 // Document the login
00415 //
00416    sprintf(buff, "logged in; data port is %d", Port);
00417    Say.Emsg("do_Login:", Stype, Sname, buff);
00418    return 1;
00419 }
00420  
00421 /******************************************************************************/
00422 /*                              d o _ R m D i d                               */
00423 /******************************************************************************/
00424   
00425 void XrdCmsAdmin::do_RmDid(int isPfn)
00426 {
00427    const char *epname = "do_RmDid";
00428    char  *tp, *thePath, apath[XrdCmsMAX_PATH_LEN];
00429    int   rc;
00430 
00431    if (!(tp = Stream.GetToken()))
00432       {Say.Emsg(epname,"removed path not specified by",Stype,Sname);
00433        return;
00434       }
00435 
00436 // Handle prepare queue removal
00437 //
00438    if (PrepQ.isOK())
00439       {if (!isPfn && Config.lcl_N2N)
00440           if ((rc = Config.lcl_N2N->lfn2pfn(tp, apath, sizeof(apath))))
00441              {Say.Emsg(epname, rc, "determine pfn for removed path", tp);
00442               thePath = 0;
00443              } else thePath = apath;
00444           else thePath = tp;
00445        if (thePath) PrepQ.Gone(thePath);
00446       }
00447 
00448 // If we have a pfn then we must get the lfn to inform our manager about the file
00449 //
00450    if (isPfn && Config.lcl_N2N)
00451       {if ((rc = Config.lcl_N2N->pfn2lfn(tp, apath, sizeof(apath))))
00452           {Say.Emsg(epname, rc, "determine lfn for removed path", tp);
00453            return;
00454           } else tp = apath;
00455       }
00456 
00457    DEBUG("sending managers gone " <<tp);
00458    Manager.Inform(kYR_gone, kYR_raw, tp, strlen(tp)+1);
00459 }
00460  
00461 /******************************************************************************/
00462 /*                              d o _ R m D u d                               */
00463 /******************************************************************************/
00464   
00465 void XrdCmsAdmin::do_RmDud(int isPfn)
00466 {
00467    const char *epname = "do_RmDud";
00468    char *tp, *pp, apath[XrdCmsMAX_PATH_LEN];
00469    int   rc, Mods = kYR_raw;
00470 
00471    if (!(tp = Stream.GetToken()))
00472       {Say.Emsg(epname,"added path not specified by",Stype,Sname);
00473        return;
00474       }
00475 
00476    if ((pp = Stream.GetToken()) && *pp == 'p') Mods |= CmsHaveRequest::Pending;
00477       else Mods |= CmsHaveRequest::Online;
00478 
00479    if (isPfn && Config.lcl_N2N)
00480       {if ((rc = Config.lcl_N2N->pfn2lfn(tp, apath, sizeof(apath))))
00481           {Say.Emsg(epname, rc, "determine lfn for added path", tp);
00482            return;
00483           } else tp = apath;
00484       }
00485 
00486    DEBUG("sending managers have online " <<tp);
00487    Manager.Inform(kYR_have, Mods, tp, strlen(tp)+1);
00488 }

Generated on Tue Jul 5 14:46:22 2011 for ROOT_528-00b_version by  doxygen 1.5.1