00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdXrootdAdminCVSID = "$Id: XrdXrootdAdmin.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <fcntl.h>
00016 #include <stdlib.h>
00017 #include <string.h>
00018 #include <unistd.h>
00019 #include <netinet/in.h>
00020 #include <sys/types.h>
00021
00022 #include "XrdVersion.hh"
00023 #include "Xrd/XrdLink.hh"
00024 #include "XrdNet/XrdNetSocket.hh"
00025 #include "XrdSys/XrdSysError.hh"
00026 #include "XrdSys/XrdSysPlatform.hh"
00027 #include "XrdSys/XrdSysPthread.hh"
00028 #include "XrdOuc/XrdOucTList.hh"
00029 #include "XrdXrootd/XrdXrootdAdmin.hh"
00030 #include "XrdXrootd/XrdXrootdJob.hh"
00031 #include "XrdXrootd/XrdXrootdProtocol.hh"
00032 #include "XrdXrootd/XrdXrootdTrace.hh"
00033
00034
00035
00036
00037
00038 extern XrdOucTrace *XrdXrootdTrace;
00039
00040 XrdSysError *XrdXrootdAdmin::eDest;
00041
00042 XrdXrootdAdmin::JobTable *XrdXrootdAdmin::JobList = 0;
00043
00044
00045
00046
00047
00048 void *XrdXrootdInitAdmin(void *carg)
00049 {XrdXrootdAdmin Admin;
00050 return Admin.Start((XrdNetSocket *)carg);
00051 }
00052
00053 void *XrdXrootdLoginAdmin(void *carg)
00054 {XrdXrootdAdmin *Admin = new XrdXrootdAdmin();
00055 Admin->Login(*(int *)carg);
00056 delete Admin;
00057 return (void *)0;
00058 }
00059
00060
00061
00062
00063
00064 void XrdXrootdAdmin::addJob(const char *jname, XrdXrootdJob *jp)
00065 {
00066 JobTable *jTabp = new JobTable();
00067
00068 jTabp->Jname = strdup(jname);
00069 jTabp->Job = jp;
00070 jTabp->Next = JobList;
00071 JobList = jTabp;
00072 }
00073
00074
00075
00076
00077
00078 int XrdXrootdAdmin::Init(XrdSysError *erp, XrdNetSocket *asock)
00079 {
00080 const char *epname = "Init";
00081 pthread_t tid;
00082
00083 eDest = erp;
00084 if (XrdSysThread::Run(&tid, XrdXrootdInitAdmin, (void *)asock,
00085 0, "Admin traffic"))
00086 {eDest->Emsg(epname, errno, "start admin");
00087 return 0;
00088 }
00089 return 1;
00090 }
00091
00092
00093
00094
00095
00096 void XrdXrootdAdmin::Login(int socknum)
00097 {
00098 const char *epname = "Admin";
00099 char *tp;
00100
00101
00102
00103 Stream.SetEroute(eDest);
00104 Stream.AttachIO(socknum, socknum);
00105
00106
00107
00108 if (!Stream.GetLine())
00109 {eDest->Emsg(epname, "No admin login specified");
00110 return;
00111 }
00112
00113
00114
00115 if (getreqID()
00116 || !(tp = Stream.GetToken())
00117 || strcmp("login", tp)
00118 || do_Login())
00119 {eDest->Emsg(epname, "Invalid admin login sequence");
00120 return;
00121 }
00122
00123
00124
00125 eDest->Emsg(epname, "Admin", TraceID, "logged in");
00126 Xeq();
00127 }
00128
00129
00130
00131
00132
00133 void *XrdXrootdAdmin::Start(XrdNetSocket *AdminSock)
00134 {
00135 const char *epname = "Start";
00136 int InSock;
00137 pthread_t tid;
00138
00139
00140
00141 while(1) if ((InSock = AdminSock->Accept()) >= 0)
00142 {if (XrdSysThread::Run(&tid,XrdXrootdLoginAdmin,(void *)&InSock))
00143 {eDest->Emsg(epname, errno, "start admin");
00144 close(InSock);
00145 }
00146 } else eDest->Emsg(epname, errno, "accept connection");
00147 return (void *)0;
00148 }
00149
00150
00151
00152
00153
00154
00155
00156
00157 int XrdXrootdAdmin::do_Abort()
00158 {
00159 char *msg;
00160 int mlen, rc;
00161
00162
00163
00164 if ((rc = getTarget("abort", &msg))) return 0;
00165
00166
00167
00168 msg = getMsg(msg, mlen);
00169
00170
00171
00172 if (msg) return sendResp("abort", kXR_asyncab, msg, mlen);
00173 return sendResp("abort", kXR_asyncab);
00174 }
00175
00176
00177
00178
00179
00180 int XrdXrootdAdmin::do_Cj()
00181 {
00182 const char *fmt1 = "<resp id=\"%s\"><rc>0</rc>";
00183 const char *fmt2 = "<num>%d</num></resp>\n";
00184 char *tp, buff[1024];
00185 XrdXrootdJob *jobp;
00186 JobTable *jTabp;
00187 int i, rc;
00188
00189
00190
00191 if (!(tp = Stream.GetToken()))
00192 {sendErr(8, "cj", "job type not specified.");
00193 return -1;
00194 }
00195
00196
00197
00198 jTabp = JobList;
00199 while(jTabp && strcmp(tp, jTabp->Jname)) jTabp = jTabp->Next;
00200
00201
00202
00203 if (jTabp) jobp = jTabp->Job;
00204 else if (!strcmp(tp, "*")) jobp = 0;
00205 else {sendErr(8, "cj", "invalid job type specified.");
00206 return -1;
00207 }
00208
00209
00210
00211 tp = Stream.GetToken();
00212
00213
00214
00215 i = sprintf(buff, fmt1, reqID);
00216 if (Stream.Put(buff, i)) return -1;
00217
00218
00219
00220 if (jobp) rc = jobp->Cancel(tp);
00221 else {jTabp = JobList; rc = 0;
00222 while(jTabp) {rc += jTabp->Job->Cancel(tp); jTabp = jTabp->Next;}
00223 }
00224
00225
00226
00227 i = sprintf(buff, fmt2, rc);
00228 return Stream.Put(buff, i);
00229 }
00230
00231
00232
00233
00234
00235 int XrdXrootdAdmin::do_Cont()
00236 {
00237 int rc;
00238
00239
00240
00241 if ((rc = getTarget("cont"))) return 0;
00242
00243
00244
00245 return sendResp("cont", kXR_asyncgo);
00246 }
00247
00248
00249
00250
00251
00252 int XrdXrootdAdmin::do_Disc()
00253 {
00254 kXR_int32 msg[2];
00255 char *tp;
00256 int rc;
00257
00258
00259
00260 if ((rc = getTarget("disc"))) return 0;
00261
00262
00263
00264 if (!(tp = Stream.GetToken()) || !(msg[0] = strtol(tp, 0, 10)))
00265 return sendErr(8, "disc", " reconnect interval missing or invalid.");
00266 if (!(tp = Stream.GetToken()) || !(msg[1] = strtol(tp, 0, 10)))
00267 return sendErr(8, "disc", "reconnect timeout missing or invalid.");
00268
00269
00270
00271 msg[0] = htonl(msg[0]); msg[1] = htonl(msg[1]);
00272 return sendResp("disc", kXR_asyncdi, (const char *)msg, sizeof(msg));
00273 }
00274
00275
00276
00277
00278
00279 int XrdXrootdAdmin::do_Login()
00280 {
00281 const char *fmt="<resp id=\"%s\"><rc>0</rc><v>" XROOTD_VERSION "</v></resp>\n";
00282 char *tp, buff[1024];
00283 int blen;
00284
00285
00286
00287 if (!(tp = Stream.GetToken()))
00288 {eDest->Emsg("do_Login", "login name not specified");
00289 return 0;
00290 } else strlcpy(TraceID, tp, sizeof(TraceID));
00291
00292
00293
00294 blen = snprintf(buff, sizeof(buff)-1, fmt, reqID);
00295 buff[sizeof(buff)-1] = '\0';
00296 return Stream.Put(buff, blen);
00297 }
00298
00299
00300
00301
00302
00303 int XrdXrootdAdmin::do_Lsc()
00304 {
00305 const char *fmt1 = "<resp id=\"%s\"><rc>0</rc><conn>";
00306 const char *fmt2 = "</conn></resp>\n";
00307 static int fmt2len = strlen(fmt2);
00308 char buff[1024];
00309 const char *mdat[3] = {buff, " ", 0};
00310 int mlen[3] = {0, 1, 0};
00311 int i, rc, curr = -1;
00312
00313
00314
00315 if ((rc = getTarget("lsc"))) return 0;
00316
00317
00318
00319 i = sprintf(buff, fmt1, reqID);
00320 if (Stream.Put(buff, i)) return -1;
00321
00322
00323
00324 while((mlen[0] = XrdLink::getName(curr, buff, sizeof(buff), &Target)))
00325 if (Stream.Put(mdat, mlen)) return -1;
00326 return Stream.Put(fmt2, fmt2len);
00327 }
00328
00329
00330
00331
00332
00333 int XrdXrootdAdmin::do_Lsd()
00334 {
00335 const char *fmt1 = "<resp id=\"%s\"><rc>0</rc>";
00336 const char *fmt2 = "<c r=\"%c\" t=\"%lld\" v=\"%d\" m=\"%s\">";
00337 const char *fmt2a= "<io u=\"%d\"><nf>%d</nf><p>%lld<n>%d</n></p>"
00338 "<i>%lld<n>%d</n></i><o>%lld<n>%d</n></o>"
00339 "<s>%d</s><t>%d</t></io>";
00340 const char *fmt3 = "<auth p=\"%s\"><n>";
00341 const char *fmt3e= "</r></auth>";
00342 const char *fmt4 = "</resp>\n";
00343 static int fmt3elen= strlen(fmt3e);
00344 static int fmt4len = strlen(fmt4);
00345 char ctyp, monit[3], *mm, cname[1024], buff[100];
00346 char aprot[XrdSecPROTOIDSIZE+2], abuff[32], iobuff[256];
00347 const char *mdat[24]= {buff, cname, iobuff};
00348 int mlen[24]= {0};
00349 long long conn, inBytes, outBytes;
00350 int i, rc, cver, inuse, stalls, tardies, curr = -1;
00351 XrdLink *lp;
00352 XrdProtocol *xp;
00353 XrdXrootdProtocol *pp;
00354
00355
00356
00357 if ((rc = getTarget("lsd"))) return 0;
00358
00359
00360
00361 i = sprintf(buff, fmt1, reqID);
00362 if (Stream.Put(buff, i)) return -1;
00363
00364
00365
00366 while((lp = XrdLink::Find(curr, &Target)))
00367 if ((xp = lp->getProtocol())
00368 && (pp = dynamic_cast<XrdXrootdProtocol *>(xp)))
00369 {cver = int(pp->CapVer);
00370 ctyp = (pp->Status & XRD_ADMINUSER ? 'a' : 'u');
00371 conn = static_cast<long long>(lp->timeCon());
00372 mm = monit;
00373 if (pp->monFILE) *mm++ = 'f';
00374 if (pp->monIO ) *mm++ = 'i';
00375 *mm = '\0';
00376 inuse = lp->getIOStats(inBytes, outBytes, stalls, tardies);
00377 mlen[0] = sprintf(buff, fmt2, ctyp, conn, cver, monit);
00378 mlen[1] = lp->Client(cname, sizeof(cname));
00379 mlen[2] = sprintf(iobuff, fmt2a,inuse-1,pp->numFiles,pp->totReadP,
00380 (pp->cumReadP + pp->numReadP),
00381 inBytes, (pp->cumWrites+ pp->numWrites),
00382 outBytes,(pp->cumReads + pp->numReads),
00383 stalls, tardies);
00384 i = 3;
00385 if ((pp->Client) && pp->Client != &(pp->Entity))
00386 {strncpy(aprot, pp->Client->prot, XrdSecPROTOIDSIZE);
00387 aprot[XrdSecPROTOIDSIZE] = '\0';
00388 mdat[i] = abuff;
00389 mlen[i++]= sprintf(abuff, fmt3, aprot);
00390 i = 1;
00391 if (pp->Client->name && (mlen[i] = strlen(pp->Client->name)))
00392 mdat[i++] = pp->Client->name;
00393 mdat[i] = "</n><h>"; mlen[i++] = 7;
00394 if (pp->Client->host && (mlen[i] = strlen(pp->Client->host)))
00395 mdat[i++] = pp->Client->host;
00396 mdat[i] = "</h><o>"; mlen[i++] = 7;
00397 if (pp->Client->vorg && (mlen[i] = strlen(pp->Client->vorg)))
00398 mdat[i++] = pp->Client->vorg;
00399 mdat[i] = "</o><r>"; mlen[i++] = 7;
00400 if (pp->Client->role && (mlen[i] = strlen(pp->Client->role)))
00401 mdat[i++] = pp->Client->role;
00402 mdat[i] = fmt3e; mlen[i++] = fmt3elen;
00403 }
00404 mdat[i] = "</c>"; mlen[i++] = 4;
00405 mdat[i] = 0; mlen[i] = 0;
00406 if (Stream.Put(mdat, mlen)) {lp->setRef(-1); return -1;}
00407 }
00408 return Stream.Put(fmt4, fmt4len);
00409 }
00410
00411
00412
00413
00414
00415 int XrdXrootdAdmin::do_Lsj()
00416 {
00417 const char *fmt1 = "<resp id=\"%s\"><rc>0</rc>";
00418 const char *fmt2 = "</resp>\n";
00419 static int fmt2len = strlen(fmt2);
00420 char *tp, buff[1024];
00421 XrdXrootdJob *jobp;
00422 JobTable *jTabp;
00423 int i, rc = 0;
00424
00425
00426
00427 if (!(tp = Stream.GetToken()))
00428 {sendErr(8, "lsj", "job type not specified.");
00429 return -1;
00430 }
00431
00432
00433
00434 jTabp = JobList;
00435 while(jTabp && strcmp(tp, jTabp->Jname)) jTabp = jTabp->Next;
00436
00437
00438
00439 if (jTabp) jobp = jTabp->Job;
00440 else if (!strcmp(tp, "*")) jobp = 0;
00441 else {sendErr(8, "lsj", "invalid job type specified.");
00442 return -1;
00443 }
00444
00445
00446
00447 i = sprintf(buff, fmt1, reqID);
00448 if (Stream.Put(buff, i)) return -1;
00449
00450
00451
00452 if (jobp) rc = do_Lsj_Xeq(jobp);
00453 else {jTabp = JobList;
00454 while(jTabp && !(rc = do_Lsj_Xeq(jTabp->Job))) jTabp = jTabp->Next;
00455 }
00456
00457
00458
00459 return (rc ? rc : Stream.Put(fmt2, fmt2len));
00460 }
00461
00462
00463
00464
00465
00466 int XrdXrootdAdmin::do_Lsj_Xeq(XrdXrootdJob *jp)
00467 {
00468 XrdOucTList *tp, *tpprev;
00469 int rc = 0;
00470
00471 if ((tp = jp->List()))
00472 while(tp && !(rc = Stream.Put(tp->text, tp->val)))
00473 {tpprev = tp; tp = tp->next; delete tpprev;}
00474
00475 while(tp) {tpprev = tp; tp = tp->next; delete tpprev;}
00476
00477 return rc;
00478 }
00479
00480
00481
00482
00483
00484 int XrdXrootdAdmin::do_Msg()
00485 {
00486 char *msg;
00487 int rc, mlen;
00488
00489
00490
00491 if ((rc = getTarget("msg", &msg))) return 0;
00492
00493
00494
00495 msg = getMsg(msg, mlen);
00496
00497
00498 if (msg) return sendResp("msg", kXR_asyncms, msg, mlen);
00499 return sendResp("msg", kXR_asyncms);
00500 }
00501
00502
00503
00504
00505
00506 int XrdXrootdAdmin::do_Pause()
00507 {
00508 kXR_int32 msg;
00509 char *tp;
00510 int rc;
00511
00512
00513
00514 if ((rc = getTarget("pause"))) return 0;
00515
00516
00517
00518 if (!(tp = Stream.GetToken()) || !(msg = strtol(tp, 0, 10)))
00519 return sendErr(8, "pause", "time missing or invalid.");
00520
00521
00522
00523 msg = htonl(msg);
00524 return sendResp("pause", kXR_asyncwt, (const char *)&msg, sizeof(msg));
00525 }
00526
00527
00528
00529
00530
00531 int XrdXrootdAdmin::do_Red()
00532 {
00533 struct msg {kXR_int32 port; char buff[8192];} myMsg;
00534 int rc, hlen, tlen, bsz;
00535 char *tp, *pn, *qq;
00536
00537
00538
00539 if ((rc = getTarget("redirect", 0))) return 0;
00540
00541
00542
00543 if (!(tp = Stream.GetToken()) || *tp == ':')
00544 return sendErr(8, "redirect", "destination host not specified.");
00545
00546
00547
00548 if (!(pn = index(tp, ':')) || !(myMsg.port = strtol(pn+1, &qq, 10)))
00549 return sendErr(8, "redirect", "port missing or invalid.");
00550 myMsg.port = htonl(myMsg.port);
00551
00552
00553
00554 *pn = '\0';
00555 hlen = strlcpy(myMsg.buff,tp,sizeof(myMsg.buff));
00556 if (static_cast<size_t>(hlen) >= sizeof(myMsg.buff))
00557 return sendErr(8, "redirect", "destination host too long.");
00558
00559
00560
00561 if (qq && *qq == '?')
00562 {bsz = sizeof(myMsg.buff) - hlen;
00563 if ((tlen = strlcpy(myMsg.buff+hlen,qq,bsz)) >= bsz)
00564 return sendErr(8, "redirect", "token too long.");
00565 } else tlen = 0;
00566
00567
00568
00569 return sendResp("redirect", kXR_asyncrd, (const char *)&myMsg, hlen+tlen+4);
00570 }
00571
00572
00573
00574
00575
00576 char *XrdXrootdAdmin::getMsg(char *msg, int &mlen)
00577 {
00578 if (msg) while(*msg == ' ') msg++;
00579 if (msg && *msg) mlen = strlen(msg)+1;
00580 else {msg = 0; mlen = 0;}
00581 return msg;
00582 }
00583
00584
00585
00586
00587
00588 int XrdXrootdAdmin::getreqID()
00589 {
00590 char *tp;
00591
00592 if (!(tp = Stream.GetToken()))
00593 {reqID[0] = '?'; reqID[1] = '\0';
00594 return sendErr(4, "request", "id not specified.");
00595 }
00596
00597 if (strlen(tp) >= sizeof(reqID))
00598 {reqID[0] = '?'; reqID[1] = '\0';
00599 return sendErr(4, "request", "id too long.");
00600 }
00601
00602 strcpy(reqID, tp);
00603 return 0;
00604 }
00605
00606
00607
00608
00609
00610
00611 int XrdXrootdAdmin::getTarget(const char *act, char **rest)
00612 {
00613 char *tp;
00614
00615
00616
00617 if (!(tp = Stream.GetToken(rest)))
00618 {sendErr(8, act, "target not specified.");
00619 return -1;
00620 }
00621 Target.Set(tp);
00622
00623 return 0;
00624 }
00625
00626
00627
00628
00629
00630 int XrdXrootdAdmin::sendErr(int rc, const char *act, const char *msg)
00631 {
00632 const char *fmt = "<resp id=\"%s\"><rc>%d</rc><msg>%s %s</msg></resp>\n";
00633 char buff[1024];
00634 int blen;
00635
00636 blen = snprintf(buff, sizeof(buff)-1, fmt, reqID, rc, act, msg);
00637 buff[sizeof(buff)-1] = '\0';
00638
00639 return Stream.Put(buff, blen);
00640 }
00641
00642
00643
00644
00645
00646 int XrdXrootdAdmin::sendOK(int sent)
00647 {
00648 const char *fmt = "<resp id=\"%s\"><rc>0</rc><num>%d</num></resp>\n";
00649 char buff[1024];
00650 int blen;
00651
00652 blen = snprintf(buff, sizeof(buff)-1, fmt, reqID, sent);
00653 buff[sizeof(buff)-1] = '\0';
00654
00655 return Stream.Put(buff, blen);
00656 }
00657
00658
00659
00660
00661
00662 int XrdXrootdAdmin::sendResp(const char *act, XActionCode anum)
00663 {
00664 XrdLink *lp;
00665 const kXR_int32 net4 = htonl(4);
00666 int numsent = 0, curr = -1;
00667
00668
00669
00670 usResp.act = htonl(anum);
00671 usResp.len = net4;
00672
00673
00674
00675 while((lp = XrdLink::Find(curr, &Target)))
00676 {TRACE(RSP, "sending " <<lp->ID <<' ' <<act);
00677 if (lp->Send((const char *)&usResp, sizeof(usResp))>0) numsent++;
00678 }
00679
00680
00681
00682 return sendOK(numsent);
00683 }
00684
00685
00686
00687 int XrdXrootdAdmin::sendResp(const char *act, XActionCode anum,
00688 const char *msg, int msgl)
00689 {
00690 struct iovec iov[2];
00691 XrdLink *lp;
00692 int numsent = 0, curr = -1, bytes = sizeof(usResp)+msgl;
00693
00694
00695
00696 usResp.act = htonl(anum);
00697 usResp.len = htonl(msgl+4);
00698
00699
00700
00701 iov[0].iov_base = (caddr_t)&usResp;
00702 iov[0].iov_len = sizeof(usResp);
00703 iov[1].iov_base = (caddr_t)msg;
00704 iov[1].iov_len = msgl;
00705
00706
00707
00708 while((lp = XrdLink::Find(curr, &Target)))
00709 {TRACE(RSP, "sending " <<lp->ID <<' ' <<act <<' ' <<msg);
00710 if (lp->Send(iov, 2, bytes)>0) numsent++;
00711 }
00712
00713
00714
00715 return sendOK(numsent);
00716 }
00717
00718
00719
00720
00721
00722 void XrdXrootdAdmin::Xeq()
00723 {
00724 const char *epname = "Xeq";
00725 int rc;
00726 char *request, *tp;
00727
00728
00729
00730
00731 rc = 0;
00732 while((request = Stream.GetLine()) && !rc)
00733 {TRACE(DEBUG, "received admin request: '" <<request <<"'");
00734 if ((rc = getreqID())) continue;
00735 if ((tp = Stream.GetToken()))
00736 { if (!strcmp("abort", tp)) rc = do_Abort();
00737 else if (!strcmp("cj", tp)) rc = do_Cj();
00738 else if (!strcmp("cont", tp)) rc = do_Cont();
00739 else if (!strcmp("disc", tp)) rc = do_Disc();
00740 else if (!strcmp("lsc", tp)) rc = do_Lsc();
00741 else if (!strcmp("lsd", tp)) rc = do_Lsd();
00742 else if (!strcmp("lsj", tp)) rc = do_Lsj();
00743 else if (!strcmp("msg", tp)) rc = do_Msg();
00744 else if (!strcmp("pause", tp)) rc = do_Pause();
00745 else if (!strcmp("redirect", tp)) rc = do_Red();
00746 else {eDest->Emsg(epname, "invalid admin request,", tp);
00747 rc = sendErr(4, tp, "is an invalid request.");
00748 }
00749 }
00750 }
00751
00752
00753
00754 eDest->Emsg("Admin", "Admin", TraceID, "logged out");
00755 return;
00756 }