XrdCmsState.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                        X r d C m s S t a t e . c c                         */
00004 /*                                                                            */
00005 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010   
00011 //         $Id: XrdCmsState.cc 32231 2010-02-05 18:24:46Z ganis $
00012 
00013 // Original Version: 1.3 2006/04/05 02:28:09 abh
00014 
00015 const char *XrdCmsStateCVSID = "$Id: XrdCmsState.cc 32231 2010-02-05 18:24:46Z ganis $";
00016 
00017 #include <fcntl.h>
00018 #include <limits.h>
00019 #include <unistd.h>
00020 #include <netinet/in.h>
00021 #include <sys/types.h>
00022 #include <sys/stat.h>
00023 
00024 #include "XProtocol/YProtocol.hh"
00025 
00026 #include "Xrd/XrdLink.hh"
00027 
00028 #include "XrdCms/XrdCmsManager.hh"
00029 #include "XrdCms/XrdCmsRTable.hh"
00030 #include "XrdCms/XrdCmsState.hh"
00031 #include "XrdCms/XrdCmsTrace.hh"
00032 
00033 #include "XrdSys/XrdSysError.hh"
00034 
00035 using namespace XrdCms;
00036  
00037 /******************************************************************************/
00038 /*                               G l o b a l s                                */
00039 /******************************************************************************/
00040   
00041 XrdCmsState XrdCms::CmsState;
00042 
00043 /******************************************************************************/
00044 /*                           C o n s t r u c t o r                            */
00045 /******************************************************************************/
00046   
00047 XrdCmsState::XrdCmsState() : mySemaphore(0)
00048 {
00049    minNodeCnt   = 1;
00050    numActive    = 0;
00051    numStaging   = 0;
00052    currState    = All_NoStage | All_Suspend;
00053    prevState    = 0;
00054    Suspended    = All_Suspend;
00055    NoStaging    = All_NoStage;
00056    feOK         = 0;
00057    noSpace      = 0;
00058    adminNoStage = 0;
00059    adminSuspend = 0;
00060    NoStageFile  = "";
00061    SuspendFile  = "";
00062    isMan        = 0;
00063    dataPort     = 0;
00064    Enabled      = 0;
00065 }
00066  
00067 /******************************************************************************/
00068 /* Punlic                         E n a b l e                                 */
00069 /******************************************************************************/
00070   
00071 void XrdCmsState::Enable()
00072 {
00073    struct stat buff;
00074 
00075 // Set correct admin staging state
00076 //
00077    Update(Stage, stat(NoStageFile, &buff));
00078 
00079 // Set correct admin suspend state
00080 //
00081    Update(Active, stat(SuspendFile, &buff));
00082 
00083 // We will force the information to be sent to interested parties by making
00084 // the previous state different from the current state and enabling ourselves.
00085 //
00086    myMutex.Lock();
00087    Enabled = 1;
00088    prevState = ~currState;
00089    mySemaphore.Post();
00090    myMutex.UnLock();
00091 }
00092 
00093 /******************************************************************************/
00094 /* Public                        M o n i t o r                                */
00095 /******************************************************************************/
00096   
00097 void *XrdCmsState::Monitor()
00098 {
00099    CmsStatusRequest myStatus = {{0, kYR_status, 0, 0}};
00100    int RTsend, theState, Changes, myPort;
00101 
00102 // Do this forever (we are only posted when finally enabled)
00103 //
00104    do {mySemaphore.Wait();
00105        myMutex.Lock(); 
00106        Changes   = currState ^ prevState;
00107        theState  = currState;
00108        prevState = currState;
00109        myPort    = dataPort;
00110        myMutex.UnLock();
00111 
00112        if (Changes && (myStatus.Hdr.modifier = Status(Changes, theState)))
00113           {if (myStatus.Hdr.modifier & CmsStatusRequest::kYR_Resume)
00114                     {myStatus.Hdr.streamid = htonl(myPort); RTsend = 1;}
00115                else {myStatus.Hdr.streamid = 0;
00116                      RTsend = (isMan > 0 ? (theState & SRV_Suspend) : 0);
00117                     }
00118            if (isMan && RTsend)
00119               RTable.Send("status", (char *)&myStatus, sizeof(myStatus));
00120            Manager.Inform(myStatus.Hdr);
00121           }
00122       } while(1);
00123 
00124 // All done
00125 //
00126    return (void *)0;
00127 }
00128   
00129 /******************************************************************************/
00130 /* Public                           P o r t                                   */
00131 /******************************************************************************/
00132   
00133 int XrdCmsState::Port()
00134 {
00135     int xPort;
00136 
00137     myMutex.Lock(); 
00138     xPort = dataPort;
00139     myMutex.UnLock();
00140     return xPort;
00141 }
00142 
00143 /******************************************************************************/
00144 /* Public                      s e n d S t a t e                              */
00145 /******************************************************************************/
00146   
00147 void XrdCmsState::sendState(XrdLink *lp)
00148 {
00149    CmsStatusRequest myStatus = {{0, kYR_status, 0, 0}};
00150 
00151    myMutex.Lock();
00152    myStatus.Hdr.modifier  = Suspended
00153                           ? CmsStatusRequest::kYR_Suspend
00154                           : CmsStatusRequest::kYR_Resume;
00155 
00156    myStatus.Hdr.modifier |= NoStaging
00157                           ? CmsStatusRequest::kYR_noStage
00158                           : CmsStatusRequest::kYR_Stage;
00159 
00160    lp->Send((char *)&myStatus.Hdr, sizeof(myStatus.Hdr));
00161    myMutex.UnLock();
00162 }
00163 
00164 /******************************************************************************/
00165 /* Public                            S e t                                    */
00166 /******************************************************************************/
00167   
00168 void XrdCmsState::Set(int ncount)
00169 {
00170 
00171 // Set the node count (this requires a lock)
00172 //
00173    myMutex.Lock(); 
00174    minNodeCnt = ncount;
00175    myMutex.UnLock();
00176 }
00177 
00178 /******************************************************************************/
00179 
00180 void XrdCmsState::Set(int ncount, int isman, const char *AdminPath)
00181 {
00182    char fnbuff[1048];
00183    int i;
00184 
00185 // This is a configuration call no locks are required.
00186 //
00187    minNodeCnt = ncount;
00188    isMan = isman;
00189    i = strlen(AdminPath);
00190    strcpy(fnbuff, AdminPath);
00191    if (AdminPath[i-1] != '/') fnbuff[i++] = '/';
00192    strcpy(fnbuff+i, "NOSTAGE");
00193    NoStageFile = strdup(fnbuff);
00194    strcpy(fnbuff+i, "SUSPEND");
00195    SuspendFile = strdup(fnbuff);
00196 }
00197 
00198 /******************************************************************************/
00199 /* Private                        S t a t u s                                 */
00200 /******************************************************************************/
00201   
00202 unsigned char XrdCmsState::Status(int Changes, int theState)
00203 {
00204    const char *SRstate = 0, *SNstate = 0;
00205    unsigned char rrModifier;
00206 
00207 // Check for suspend changes
00208 //
00209    if (Changes & All_Suspend)
00210       if (theState & All_Suspend)
00211          {rrModifier = CmsStatusRequest::kYR_Suspend;
00212           SRstate = "suspended";
00213          } else {
00214           rrModifier = CmsStatusRequest::kYR_Resume;
00215           SRstate = "active";
00216          }
00217       else rrModifier = 0;
00218 
00219 // Check for staging changes
00220 //
00221    if (Changes & All_NoStage)
00222       {if (theState & All_NoStage)
00223           {rrModifier |= CmsStatusRequest::kYR_noStage;
00224            SNstate = "+ nostaging";
00225           } else {
00226            rrModifier |= CmsStatusRequest::kYR_Stage;
00227            SNstate = "+ staging";
00228           }
00229       }
00230 
00231 // Report and return status
00232 //
00233    if (rrModifier) 
00234       {if (!SRstate && SNstate) SNstate += 2;
00235        Say.Emsg("State", "Status changed to", SRstate, SNstate);
00236       }
00237    return rrModifier;
00238 }
00239  
00240 /******************************************************************************/
00241 /* Public                         U p d a t e                                 */
00242 /******************************************************************************/
00243 
00244 void XrdCmsState::Update(StateType StateT, int ActivCnt, int StageCnt)
00245 {
00246   EPNAME("Update");
00247   const char *What;
00248   char newVal;
00249 
00250 // Create new state
00251 //
00252    myMutex.Lock();
00253    switch(StateT)
00254          {case Active:   if ((newVal = ActivCnt ? 0 : 1) != adminSuspend)
00255                             {if (newVal) unlink(SuspendFile);
00256                                 else close(open(SuspendFile, O_WRONLY|O_CREAT,
00257                                                              S_IRUSR|S_IWUSR));
00258                              adminSuspend = newVal;
00259                             }
00260                          What = "Active";
00261                          break;
00262           case Counts:   numStaging += StageCnt;
00263                          numActive  += ActivCnt;
00264                          What = "Counts";
00265                          break;
00266           case FrontEnd: if ((feOK = (ActivCnt ? 1 : 0)) && StageCnt >= 0)
00267                             dataPort = StageCnt;
00268                          What = "FrontEnd";
00269                          break;
00270           case Space:    noSpace = (ActivCnt ? 0 : 1);
00271                          What = "Space";
00272                          break;
00273           case Stage:    if ((newVal = ActivCnt ? 0 : 1) != adminNoStage)
00274                             {if (newVal) unlink(NoStageFile);
00275                                 else close(open(NoStageFile, O_WRONLY|O_CREAT,
00276                                                              S_IRUSR|S_IWUSR));
00277                              adminNoStage = newVal;
00278                             }
00279                          What = "Stage";
00280                          break;
00281           default:       Say.Emsg("State", "Invalid state update");
00282                          What = "Unknown";
00283                          break;
00284          }
00285 
00286    DEBUG(What <<" Parm1=" <<ActivCnt <<" Parm2=" <<StageCnt);
00287    currState=(numActive  < minNodeCnt   || adminSuspend ? SRV_Suspend:0)
00288             |(numStaging < 1 || noSpace || adminNoStage ? All_NoStage:0)
00289             |                                   ( !feOK ? FES_Suspend:0);
00290 
00291    Suspended = currState & All_Suspend;
00292    NoStaging = currState & All_NoStage;
00293 
00294 // If any changes are noted then we must send out notifications
00295 //
00296    if (currState != prevState && Enabled) mySemaphore.Post();
00297 
00298 // All done
00299 //
00300    myMutex.UnLock();
00301 }

Generated on Tue Jul 5 14:46:31 2011 for ROOT_528-00b_version by  doxygen 1.5.1