00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 const char *XrdCmsStateCVSID = "$Id: XrdCmsState.cc 32231 2010-02-05 18:24:46Z ganis $";
00016
00017 #include <fcntl.h>
00018 #include <limits.h>
00019 #include <unistd.h>
00020 #include <netinet/in.h>
00021 #include <sys/types.h>
00022 #include <sys/stat.h>
00023
00024 #include "XProtocol/YProtocol.hh"
00025
00026 #include "Xrd/XrdLink.hh"
00027
00028 #include "XrdCms/XrdCmsManager.hh"
00029 #include "XrdCms/XrdCmsRTable.hh"
00030 #include "XrdCms/XrdCmsState.hh"
00031 #include "XrdCms/XrdCmsTrace.hh"
00032
00033 #include "XrdSys/XrdSysError.hh"
00034
00035 using namespace XrdCms;
00036
00037
00038
00039
00040
00041 XrdCmsState XrdCms::CmsState;
00042
00043
00044
00045
00046
00047 XrdCmsState::XrdCmsState() : mySemaphore(0)
00048 {
00049 minNodeCnt = 1;
00050 numActive = 0;
00051 numStaging = 0;
00052 currState = All_NoStage | All_Suspend;
00053 prevState = 0;
00054 Suspended = All_Suspend;
00055 NoStaging = All_NoStage;
00056 feOK = 0;
00057 noSpace = 0;
00058 adminNoStage = 0;
00059 adminSuspend = 0;
00060 NoStageFile = "";
00061 SuspendFile = "";
00062 isMan = 0;
00063 dataPort = 0;
00064 Enabled = 0;
00065 }
00066
00067
00068
00069
00070
00071 void XrdCmsState::Enable()
00072 {
00073 struct stat buff;
00074
00075
00076
00077 Update(Stage, stat(NoStageFile, &buff));
00078
00079
00080
00081 Update(Active, stat(SuspendFile, &buff));
00082
00083
00084
00085
00086 myMutex.Lock();
00087 Enabled = 1;
00088 prevState = ~currState;
00089 mySemaphore.Post();
00090 myMutex.UnLock();
00091 }
00092
00093
00094
00095
00096
00097 void *XrdCmsState::Monitor()
00098 {
00099 CmsStatusRequest myStatus = {{0, kYR_status, 0, 0}};
00100 int RTsend, theState, Changes, myPort;
00101
00102
00103
00104 do {mySemaphore.Wait();
00105 myMutex.Lock();
00106 Changes = currState ^ prevState;
00107 theState = currState;
00108 prevState = currState;
00109 myPort = dataPort;
00110 myMutex.UnLock();
00111
00112 if (Changes && (myStatus.Hdr.modifier = Status(Changes, theState)))
00113 {if (myStatus.Hdr.modifier & CmsStatusRequest::kYR_Resume)
00114 {myStatus.Hdr.streamid = htonl(myPort); RTsend = 1;}
00115 else {myStatus.Hdr.streamid = 0;
00116 RTsend = (isMan > 0 ? (theState & SRV_Suspend) : 0);
00117 }
00118 if (isMan && RTsend)
00119 RTable.Send("status", (char *)&myStatus, sizeof(myStatus));
00120 Manager.Inform(myStatus.Hdr);
00121 }
00122 } while(1);
00123
00124
00125
00126 return (void *)0;
00127 }
00128
00129
00130
00131
00132
00133 int XrdCmsState::Port()
00134 {
00135 int xPort;
00136
00137 myMutex.Lock();
00138 xPort = dataPort;
00139 myMutex.UnLock();
00140 return xPort;
00141 }
00142
00143
00144
00145
00146
00147 void XrdCmsState::sendState(XrdLink *lp)
00148 {
00149 CmsStatusRequest myStatus = {{0, kYR_status, 0, 0}};
00150
00151 myMutex.Lock();
00152 myStatus.Hdr.modifier = Suspended
00153 ? CmsStatusRequest::kYR_Suspend
00154 : CmsStatusRequest::kYR_Resume;
00155
00156 myStatus.Hdr.modifier |= NoStaging
00157 ? CmsStatusRequest::kYR_noStage
00158 : CmsStatusRequest::kYR_Stage;
00159
00160 lp->Send((char *)&myStatus.Hdr, sizeof(myStatus.Hdr));
00161 myMutex.UnLock();
00162 }
00163
00164
00165
00166
00167
00168 void XrdCmsState::Set(int ncount)
00169 {
00170
00171
00172
00173 myMutex.Lock();
00174 minNodeCnt = ncount;
00175 myMutex.UnLock();
00176 }
00177
00178
00179
00180 void XrdCmsState::Set(int ncount, int isman, const char *AdminPath)
00181 {
00182 char fnbuff[1048];
00183 int i;
00184
00185
00186
00187 minNodeCnt = ncount;
00188 isMan = isman;
00189 i = strlen(AdminPath);
00190 strcpy(fnbuff, AdminPath);
00191 if (AdminPath[i-1] != '/') fnbuff[i++] = '/';
00192 strcpy(fnbuff+i, "NOSTAGE");
00193 NoStageFile = strdup(fnbuff);
00194 strcpy(fnbuff+i, "SUSPEND");
00195 SuspendFile = strdup(fnbuff);
00196 }
00197
00198
00199
00200
00201
00202 unsigned char XrdCmsState::Status(int Changes, int theState)
00203 {
00204 const char *SRstate = 0, *SNstate = 0;
00205 unsigned char rrModifier;
00206
00207
00208
00209 if (Changes & All_Suspend)
00210 if (theState & All_Suspend)
00211 {rrModifier = CmsStatusRequest::kYR_Suspend;
00212 SRstate = "suspended";
00213 } else {
00214 rrModifier = CmsStatusRequest::kYR_Resume;
00215 SRstate = "active";
00216 }
00217 else rrModifier = 0;
00218
00219
00220
00221 if (Changes & All_NoStage)
00222 {if (theState & All_NoStage)
00223 {rrModifier |= CmsStatusRequest::kYR_noStage;
00224 SNstate = "+ nostaging";
00225 } else {
00226 rrModifier |= CmsStatusRequest::kYR_Stage;
00227 SNstate = "+ staging";
00228 }
00229 }
00230
00231
00232
00233 if (rrModifier)
00234 {if (!SRstate && SNstate) SNstate += 2;
00235 Say.Emsg("State", "Status changed to", SRstate, SNstate);
00236 }
00237 return rrModifier;
00238 }
00239
00240
00241
00242
00243
00244 void XrdCmsState::Update(StateType StateT, int ActivCnt, int StageCnt)
00245 {
00246 EPNAME("Update");
00247 const char *What;
00248 char newVal;
00249
00250
00251
00252 myMutex.Lock();
00253 switch(StateT)
00254 {case Active: if ((newVal = ActivCnt ? 0 : 1) != adminSuspend)
00255 {if (newVal) unlink(SuspendFile);
00256 else close(open(SuspendFile, O_WRONLY|O_CREAT,
00257 S_IRUSR|S_IWUSR));
00258 adminSuspend = newVal;
00259 }
00260 What = "Active";
00261 break;
00262 case Counts: numStaging += StageCnt;
00263 numActive += ActivCnt;
00264 What = "Counts";
00265 break;
00266 case FrontEnd: if ((feOK = (ActivCnt ? 1 : 0)) && StageCnt >= 0)
00267 dataPort = StageCnt;
00268 What = "FrontEnd";
00269 break;
00270 case Space: noSpace = (ActivCnt ? 0 : 1);
00271 What = "Space";
00272 break;
00273 case Stage: if ((newVal = ActivCnt ? 0 : 1) != adminNoStage)
00274 {if (newVal) unlink(NoStageFile);
00275 else close(open(NoStageFile, O_WRONLY|O_CREAT,
00276 S_IRUSR|S_IWUSR));
00277 adminNoStage = newVal;
00278 }
00279 What = "Stage";
00280 break;
00281 default: Say.Emsg("State", "Invalid state update");
00282 What = "Unknown";
00283 break;
00284 }
00285
00286 DEBUG(What <<" Parm1=" <<ActivCnt <<" Parm2=" <<StageCnt);
00287 currState=(numActive < minNodeCnt || adminSuspend ? SRV_Suspend:0)
00288 |(numStaging < 1 || noSpace || adminNoStage ? All_NoStage:0)
00289 | ( !feOK ? FES_Suspend:0);
00290
00291 Suspended = currState & All_Suspend;
00292 NoStaging = currState & All_NoStage;
00293
00294
00295
00296 if (currState != prevState && Enabled) mySemaphore.Post();
00297
00298
00299
00300 myMutex.UnLock();
00301 }