00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 const char *XrdCmsAdminCVSID = "$Id: XrdCmsAdmin.cc 34000 2010-06-21 06:49:56Z ganis $";
00016
00017 #include <stdio.h>
00018 #include <limits.h>
00019 #include <unistd.h>
00020 #include <sys/types.h>
00021
00022 #include "XProtocol/YProtocol.hh"
00023
00024 #include "XrdCms/XrdCmsAdmin.hh"
00025 #include "XrdCms/XrdCmsConfig.hh"
00026 #include "XrdCms/XrdCmsManager.hh"
00027 #include "XrdCms/XrdCmsPrepare.hh"
00028 #include "XrdCms/XrdCmsState.hh"
00029 #include "XrdCms/XrdCmsTrace.hh"
00030 #include "XrdNet/XrdNetSocket.hh"
00031 #include "XrdOuc/XrdOuca2x.hh"
00032 #include "XrdOuc/XrdOucName2Name.hh"
00033 #include "XrdSys/XrdSysError.hh"
00034 #include "XrdSys/XrdSysPlatform.hh"
00035
00036 using namespace XrdCms;
00037
00038
00039
00040
00041
00042 namespace XrdCms
00043 {
00044 class AdminReq
00045 {
00046 public:
00047
00048 AdminReq *Next;
00049 const char *Req;
00050 const char *Path;
00051 CmsRRHdr Hdr;
00052 char *Data;
00053 int Dlen;
00054 static int numinQ;
00055 static const int maxinQ = 1024;
00056
00057 static AdminReq *getReq() {AdminReq *arP;
00058 do {QPresent.Wait();
00059 QMutex.Lock();
00060 if ((arP = First))
00061 {if (!(First = arP->Next)) Last = 0;
00062 numinQ--;
00063 }
00064 QMutex.UnLock();
00065 } while (!arP);
00066 return arP;
00067 }
00068
00069 void Requeue() {QMutex.Lock();
00070 Next=First; First=this; QPresent.Post(); numinQ++;
00071 QMutex.UnLock();
00072 }
00073
00074 AdminReq(const char *req, XrdCmsRRData &RRD)
00075 : Next(0), Req(req), Path(RRD.Path ? RRD.Path : ""),
00076 Hdr(RRD.Request), Data(RRD.Buff), Dlen(RRD.Dlen)
00077 {RRD.Buff = 0;
00078 QMutex.Lock();
00079 if (Last) {Last->Next = this; Last = this;}
00080 else First=Last = this;
00081 QPresent.Post();
00082 numinQ++;
00083 QMutex.UnLock();
00084 }
00085
00086 ~AdminReq() {if (Data) free(Data);}
00087
00088 private:
00089
00090 static XrdSysSemaphore QPresent;
00091 static XrdSysMutex QMutex;
00092 static AdminReq *First;
00093 static AdminReq *Last;
00094 };
00095 };
00096
00097
00098
00099
00100
00101 XrdSysSemaphore AdminReq::QPresent(0);
00102 XrdSysMutex AdminReq::QMutex;
00103 AdminReq *AdminReq::First = 0;
00104 AdminReq *AdminReq::Last = 0;
00105 int AdminReq::numinQ= 0;
00106
00107 XrdSysMutex XrdCmsAdmin::myMutex;
00108 XrdSysSemaphore *XrdCmsAdmin::SyncUp = 0;
00109 int XrdCmsAdmin::POnline= 0;
00110
00111
00112
00113
00114
00115 void *XrdCmsAdminLogin(void *carg)
00116 {XrdCmsAdmin *Admin = new XrdCmsAdmin();
00117 Admin->Login(*(int *)carg);
00118 delete Admin;
00119 return (void *)0;
00120 }
00121
00122 void *XrdCmsAdminSend(void *carg)
00123 {XrdCmsAdmin::Relay(0,0);
00124 return (void *)0;
00125 }
00126
00127
00128
00129
00130
00131 void XrdCmsAdmin::Login(int socknum)
00132 {
00133 const char *epname = "Admin_Login";
00134 char *request, *tp;
00135
00136
00137
00138 Stream.Attach(socknum);
00139
00140
00141
00142 if ((request = Stream.GetLine()))
00143 {DEBUG("initial request: '" <<request <<"'");
00144 if (!(tp = Stream.GetToken()) || strcmp("login", tp) || !do_Login())
00145 {Say.Emsg(epname, "Invalid admin login sequence");
00146 return;
00147 }
00148 } else {Say.Emsg(epname, "No admin login specified");
00149 return;
00150 }
00151
00152
00153
00154 while((request = Stream.GetLine()))
00155 {DEBUG("received request: '" <<request <<"'");
00156 if ((tp = Stream.GetToken()))
00157 { if (!strcmp("resume", tp))
00158 CmsState.Update(XrdCmsState::Active, 1);
00159 else if (!strcmp("rmdid", tp)) do_RmDid();
00160 else if (!strcmp("newfn", tp)) do_RmDud();
00161 else if (!strcmp("suspend", tp))
00162 {CmsState.Update(XrdCmsState::Active, 0);
00163 Say.Emsg("Notes","suspend requested by",Stype,Sname);
00164 }
00165 else Say.Emsg(epname, "invalid admin request,", tp);
00166 }
00167 }
00168
00169
00170
00171 Say.Emsg("Login", Stype, Sname, "logged out");
00172
00173
00174
00175 if (Primary)
00176 {CmsState.Update(XrdCmsState::FrontEnd, 0, -1);
00177 myMutex.Lock();
00178 POnline = 0;
00179 Relay(1,-1);
00180 myMutex.UnLock();
00181 }
00182 return;
00183 }
00184
00185
00186
00187
00188
00189 void *XrdCmsAdmin::Notes(XrdNetSocket *AnoteSock)
00190 {
00191 const char *epname = "Notes";
00192 char *request, *tp;
00193 int rc;
00194
00195
00196
00197 Stream.Attach(AnoteSock->Detach());
00198 Sname = strdup("anon");
00199
00200
00201
00202 do {while((request = Stream.GetLine()))
00203 {DEBUG("received notification: '" <<request <<"'");
00204 if ((tp = Stream.GetToken()))
00205 { if (!strcmp("gone", tp)) do_RmDid(1);
00206 else if (!strcmp("rmdid", tp)) do_RmDid(0);
00207 else if (!strcmp("have", tp)) do_RmDud(1);
00208 else if (!strcmp("newfn", tp)) do_RmDud(0);
00209 else if (!strcmp("nostage", tp))
00210 {CmsState.Update(XrdCmsState::Stage, 0);
00211 Say.Emsg("Notes","nostage requested by",Stype,Sname);
00212 }
00213 else if (!strcmp("stage", tp))
00214 CmsState.Update(XrdCmsState::Stage, 1);
00215 else Say.Emsg(epname, "invalid notification,", tp);
00216 }
00217 }
00218 if ((rc = Stream.LastError())) break;
00219 rc = Stream.Detach(); Stream.Attach(rc);
00220 } while(1);
00221
00222
00223
00224 Say.Emsg(epname, rc, "accept notification");
00225 return (void *)0;
00226 }
00227
00228
00229
00230
00231
00232 void XrdCmsAdmin::Relay(int setSock, int newSock)
00233 {
00234 const char *epname = "Admin_Relay";
00235 static const int HdrSz = sizeof(CmsRRHdr);
00236 static XrdSysMutex SMutex;
00237 static XrdSysSemaphore SReady(0);
00238 static int curSock = -1;
00239 AdminReq *arP;
00240 int retc, mySock = -1;
00241
00242
00243 if (setSock)
00244 {SMutex.Lock();
00245 if (curSock >= 0) close(curSock);
00246 else if (newSock >= 0) SReady.Post();
00247 if (newSock < 0) curSock = -1;
00248 else {curSock = dup(newSock); XrdNetSocket::setOpts(curSock, 0);}
00249 SMutex.UnLock();
00250 return;
00251 }
00252
00253
00254
00255 do {while(mySock < 0)
00256 {SMutex.Lock();
00257 if (curSock < 0) {SMutex.UnLock(); SReady.Wait(); SMutex.Lock();}
00258 mySock = curSock; curSock = -1;
00259 SMutex.UnLock();
00260 }
00261
00262 do {arP = AdminReq::getReq();
00263
00264 if ((retc = write(mySock, &arP->Hdr, HdrSz)) != HdrSz
00265 || (retc = write(mySock, arP->Data, arP->Dlen)) != arP->Dlen)
00266 retc = (retc < 0 ? errno : ECANCELED);
00267 else {DEBUG("sent " <<arP->Req <<' ' <<arP->Path);
00268 delete arP; retc = 0;
00269 }
00270 } while(retc == 0);
00271
00272 if (retc) Say.Emsg("AdminRelay", retc, "relay", arP->Req);
00273 arP->Requeue();
00274 close(mySock);
00275 mySock = -1;
00276 } while(1);
00277 }
00278
00279
00280
00281
00282
00283 void XrdCmsAdmin::Send(const char *Req, XrdCmsRRData &Data)
00284 {
00285 AdminReq *arP;
00286
00287 if (AdminReq::numinQ < AdminReq::maxinQ) arP = new AdminReq(Req, Data);
00288 else Say.Emsg("Send", "Queue full; ignoring", Req, Data.Path);
00289 }
00290
00291
00292
00293
00294
00295 void *XrdCmsAdmin::Start(XrdNetSocket *AdminSock)
00296 {
00297 const char *epname = "Start";
00298 int InSock;
00299 pthread_t tid;
00300
00301
00302
00303 if (XrdSysThread::Run(&tid,XrdCmsAdminSend,(void *)0))
00304 Say.Emsg(epname, errno, "start admin relay");
00305
00306
00307
00308 if (Config.doWait)
00309 {Say.Emsg(epname, "Waiting for primary server to login.");}
00310 else if (SyncUp) {SyncUp->Post(); SyncUp = 0;}
00311
00312
00313
00314 while(1) if ((InSock = AdminSock->Accept()) >= 0)
00315 {XrdNetSocket::setOpts(InSock, 0);
00316 if (XrdSysThread::Run(&tid,XrdCmsAdminLogin,(void *)&InSock))
00317 {Say.Emsg(epname, errno, "start admin");
00318 close(InSock);
00319 }
00320 } else Say.Emsg(epname, errno, "accept connection");
00321 return (void *)0;
00322 }
00323
00324
00325
00326
00327
00328
00329
00330
00331 int XrdCmsAdmin::do_Login()
00332 {
00333 const char *emsg;
00334 char buff[64], *tp, Ltype = 0;
00335 int Port = 0;
00336
00337
00338
00339 if (!(tp = Stream.GetToken()))
00340 {Say.Emsg("do_Login", "login type not specified");
00341 return 0;
00342 }
00343
00344 Ltype = *tp;
00345 if (*(tp+1) == '\0')
00346 switch (*tp)
00347 {case 'p': Stype = "Primary server"; break;
00348 case 'P': Stype = "Proxy server"; break;
00349 case 's': Stype = "Server"; break;
00350 case 'u': Stype = "Admin"; break;
00351 default: Ltype = 0; break;
00352 }
00353
00354 if (!Ltype)
00355 {Say.Emsg("do_Login", "Invalid login type,", tp);
00356 return 0;
00357 } else Ltype = *tp;
00358
00359 if (!(tp = Stream.GetToken()))
00360 {Say.Emsg("do_Login", "login name not specified");
00361 return 0;
00362 } else Sname = strdup(tp);
00363
00364
00365
00366 while((tp = Stream.GetToken()))
00367 { if (!strcmp(tp, "port"))
00368 {if (!(tp = Stream.GetToken()))
00369 {Say.Emsg("do_Login", "login port not specified");
00370 return 0;
00371 }
00372 if (XrdOuca2x::a2i(Say,"login port",tp,&Port,0))
00373 return 0;
00374 }
00375 else {Say.Emsg("do_Login", "invalid login option -", tp);
00376 return 0;
00377 }
00378 }
00379
00380
00381
00382
00383 if (Ltype != 'p' && Ltype != 'P') return 1;
00384 if (Ltype == 'p' && Config.asProxy()) emsg = "only accepts proxies";
00385 else if (Ltype == 'P' && !Config.asProxy()) emsg = "does not accept proxies";
00386 else emsg = 0;
00387 if (emsg)
00388 {Say.Emsg("do_login", "Server login rejected; configured role", emsg);
00389 return 0;
00390 }
00391
00392
00393
00394 myMutex.Lock();
00395 if (POnline)
00396 {myMutex.UnLock();
00397 Say.Emsg("do_Login", "Primary server already logged in; login of",
00398 tp, "rejected.");
00399 return 0;
00400 }
00401
00402
00403
00404 Primary = 1;
00405 POnline = 1;
00406 Relay(1, Stream.FDNum());
00407 CmsState.Update(XrdCmsState::FrontEnd, 1, Port);
00408
00409
00410
00411 if (SyncUp) {SyncUp->Post(); SyncUp = 0;}
00412 myMutex.UnLock();
00413
00414
00415
00416 sprintf(buff, "logged in; data port is %d", Port);
00417 Say.Emsg("do_Login:", Stype, Sname, buff);
00418 return 1;
00419 }
00420
00421
00422
00423
00424
00425 void XrdCmsAdmin::do_RmDid(int isPfn)
00426 {
00427 const char *epname = "do_RmDid";
00428 char *tp, *thePath, apath[XrdCmsMAX_PATH_LEN];
00429 int rc;
00430
00431 if (!(tp = Stream.GetToken()))
00432 {Say.Emsg(epname,"removed path not specified by",Stype,Sname);
00433 return;
00434 }
00435
00436
00437
00438 if (PrepQ.isOK())
00439 {if (!isPfn && Config.lcl_N2N)
00440 if ((rc = Config.lcl_N2N->lfn2pfn(tp, apath, sizeof(apath))))
00441 {Say.Emsg(epname, rc, "determine pfn for removed path", tp);
00442 thePath = 0;
00443 } else thePath = apath;
00444 else thePath = tp;
00445 if (thePath) PrepQ.Gone(thePath);
00446 }
00447
00448
00449
00450 if (isPfn && Config.lcl_N2N)
00451 {if ((rc = Config.lcl_N2N->pfn2lfn(tp, apath, sizeof(apath))))
00452 {Say.Emsg(epname, rc, "determine lfn for removed path", tp);
00453 return;
00454 } else tp = apath;
00455 }
00456
00457 DEBUG("sending managers gone " <<tp);
00458 Manager.Inform(kYR_gone, kYR_raw, tp, strlen(tp)+1);
00459 }
00460
00461
00462
00463
00464
00465 void XrdCmsAdmin::do_RmDud(int isPfn)
00466 {
00467 const char *epname = "do_RmDud";
00468 char *tp, *pp, apath[XrdCmsMAX_PATH_LEN];
00469 int rc, Mods = kYR_raw;
00470
00471 if (!(tp = Stream.GetToken()))
00472 {Say.Emsg(epname,"added path not specified by",Stype,Sname);
00473 return;
00474 }
00475
00476 if ((pp = Stream.GetToken()) && *pp == 'p') Mods |= CmsHaveRequest::Pending;
00477 else Mods |= CmsHaveRequest::Online;
00478
00479 if (isPfn && Config.lcl_N2N)
00480 {if ((rc = Config.lcl_N2N->pfn2lfn(tp, apath, sizeof(apath))))
00481 {Say.Emsg(epname, rc, "determine lfn for added path", tp);
00482 return;
00483 } else tp = apath;
00484 }
00485
00486 DEBUG("sending managers have online " <<tp);
00487 Manager.Inform(kYR_have, Mods, tp, strlen(tp)+1);
00488 }