00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <errno.h>
00024 #ifdef WIN32
00025 #include <io.h>
00026 #endif
00027
00028 #include "Getline.h"
00029 #include "TList.h"
00030 #include "TObjArray.h"
00031 #include "TObjString.h"
00032 #include "TProof.h"
00033 #include "TProofLog.h"
00034 #include "TXProofMgr.h"
00035 #include "TXSocket.h"
00036 #include "TXSocketHandler.h"
00037 #include "TROOT.h"
00038 #include "TStopwatch.h"
00039 #include "TSysEvtHandler.h"
00040 #include "XProofProtocol.h"
00041
00042 ClassImp(TXProofMgr)
00043
00044
00045
00046
00047 class TProofMgrInterruptHandler : public TSignalHandler {
00048 private:
00049 TProofMgr *fMgr;
00050
00051 TProofMgrInterruptHandler(const TProofMgrInterruptHandler&);
00052 TProofMgrInterruptHandler& operator=(const TProofMgrInterruptHandler&);
00053 public:
00054 TProofMgrInterruptHandler(TProofMgr *mgr)
00055 : TSignalHandler(kSigInterrupt, kFALSE), fMgr(mgr) { }
00056 Bool_t Notify();
00057 };
00058
00059
00060 Bool_t TProofMgrInterruptHandler::Notify()
00061 {
00062
00063
00064
00065 if (isatty(0) != 0 && isatty(1) != 0) {
00066 TString u = fMgr->GetUrl();
00067 Printf("Opening new connection to %s", u.Data());
00068 TXSocket *s = new TXSocket(u, 'C', kPROOF_Protocol,
00069 kXPROOF_Protocol, 0, -1, (TXHandler *)fMgr);
00070 if (s && s->IsValid()) {
00071
00072 s->CtrlC();
00073 }
00074 }
00075 return kTRUE;
00076 }
00077
00078
00079
00080
00081 TProofMgr *GetTXProofMgr(const char *url, Int_t l, const char *al)
00082 { return ((TProofMgr *) new TXProofMgr(url, l, al)); }
00083
00084 class TXProofMgrInit {
00085 public:
00086 TXProofMgrInit() {
00087 TProofMgr::SetTXProofMgrHook(&GetTXProofMgr);
00088 }};
00089 static TXProofMgrInit gxproofmgr_init;
00090
00091
00092 TXProofMgr::TXProofMgr(const char *url, Int_t dbg, const char *alias)
00093 : TProofMgr(url, dbg, alias)
00094 {
00095
00096
00097
00098 fServType = kXProofd;
00099
00100
00101 if (Init(dbg) != 0) {
00102
00103
00104 SafeDelete(fSocket);
00105 }
00106 }
00107
00108
00109 Int_t TXProofMgr::Init(Int_t)
00110 {
00111
00112
00113
00114
00115
00116
00117
00118
00119 TString u = fUrl.GetUrl(kTRUE);
00120
00121 fSocket = 0;
00122 if (!(fSocket = new TXSocket(u, 'C', kPROOF_Protocol,
00123 kXPROOF_Protocol, 0, -1, this)) ||
00124 !(fSocket->IsValid())) {
00125 if (!fSocket || !(fSocket->IsServProofd()))
00126 if (gDebug > 0)
00127 Error("Init", "while opening the connection to %s - exit (error: %d)",
00128 u.Data(), (fSocket ? fSocket->GetOpenError() : -1));
00129 if (fSocket && fSocket->IsServProofd())
00130 fServType = TProofMgr::kProofd;
00131 return -1;
00132 }
00133
00134
00135 fRemoteProtocol = fSocket->GetRemoteProtocol();
00136
00137
00138 { R__LOCKGUARD2(gROOTMutex);
00139 gROOT->GetListOfSockets()->Remove(fSocket);
00140 }
00141
00142
00143 fIntHandler = new TProofMgrInterruptHandler(this);
00144
00145
00146 return 0;
00147 }
00148
00149
00150 TXProofMgr::~TXProofMgr()
00151 {
00152
00153
00154 SetInvalid();
00155 }
00156
00157
00158 void TXProofMgr::SetInvalid()
00159 {
00160
00161
00162 if (fSocket)
00163 fSocket->Close("P");
00164 SafeDelete(fSocket);
00165
00166
00167 { R__LOCKGUARD2(gROOTMutex);
00168 gROOT->GetListOfSockets()->Remove(this);
00169 }
00170 }
00171
00172
00173 TProof *TXProofMgr::AttachSession(TProofDesc *d, Bool_t gui)
00174 {
00175
00176
00177
00178
00179 if (!IsValid()) {
00180 Warning("AttachSession","invalid TXProofMgr - do nothing");
00181 return 0;
00182 }
00183 if (!d) {
00184 Warning("AttachSession","invalid description object - do nothing");
00185 return 0;
00186 }
00187
00188 if (d->GetProof())
00189
00190 return d->GetProof();
00191
00192
00193 TString u(Form("%s/?%d", fUrl.GetUrl(kTRUE), d->GetRemoteId()));
00194
00195
00196
00197 if (gui)
00198 u += "GUI";
00199
00200
00201 TProof *p = new TProof(u, 0, 0, gDebug, 0, this);
00202 if (p && p->IsValid()) {
00203
00204
00205 p->SetManager(this);
00206
00207
00208 Int_t st = (p->IsIdle()) ? TProofDesc::kIdle
00209 : TProofDesc::kRunning;
00210 d->SetStatus(st);
00211 d->SetProof(p);
00212
00213
00214 p->SetName(d->GetName());
00215
00216 } else {
00217
00218 Error("AttachSession", "attaching to PROOF session");
00219 }
00220 return p;
00221 }
00222
00223
00224 void TXProofMgr::DetachSession(Int_t id, Option_t *opt)
00225 {
00226
00227
00228
00229
00230
00231 if (!IsValid()) {
00232 Warning("DetachSession","invalid TXProofMgr - do nothing");
00233 return;
00234 }
00235
00236 if (id > 0) {
00237
00238 TProofDesc *d = GetProofDesc(id);
00239 if (d) {
00240 if (fSocket)
00241 fSocket->DisconnectSession(d->GetRemoteId(), opt);
00242 TProof *p = d->GetProof();
00243 fSessions->Remove(d);
00244 SafeDelete(p);
00245 delete d;
00246 }
00247 } else if (id == 0) {
00248
00249
00250 if (fSocket) {
00251 TString o = Form("%sA",opt);
00252 fSocket->DisconnectSession(-1, o);
00253 }
00254 if (fSessions) {
00255
00256 TIter nxd(fSessions);
00257 TProofDesc *d = 0;
00258 while ((d = (TProofDesc *)nxd())) {
00259 TProof *p = d->GetProof();
00260 SafeDelete(p);
00261 }
00262 fSessions->Delete();
00263 }
00264 }
00265
00266 return;
00267 }
00268
00269
00270 void TXProofMgr::DetachSession(TProof *p, Option_t *opt)
00271 {
00272
00273
00274
00275 if (!IsValid()) {
00276 Warning("DetachSession","invalid TXProofMgr - do nothing");
00277 return;
00278 }
00279
00280 if (p) {
00281
00282 TProofDesc *d = GetProofDesc(p);
00283 if (d) {
00284 if (fSocket)
00285 fSocket->DisconnectSession(d->GetRemoteId(), opt);
00286 fSessions->Remove(d);
00287 p->Close(opt);
00288 delete d;
00289 }
00290 }
00291
00292 return;
00293 }
00294
00295
00296 Bool_t TXProofMgr::MatchUrl(const char *url)
00297 {
00298
00299
00300
00301
00302
00303 if (!IsValid()) {
00304 Warning("MatchUrl","invalid TXProofMgr - do nothing");
00305 return 0;
00306 }
00307
00308 TUrl u(url);
00309
00310
00311 if (!strcmp(u.GetProtocol(), TUrl("a").GetProtocol()))
00312 u.SetProtocol("proof");
00313
00314 if (u.GetPort() == TUrl("a").GetPort()) {
00315
00316 Int_t port = gSystem->GetServiceByName("proofd");
00317 if (port < 0)
00318 port = 1093;
00319 u.SetPort(port);
00320 }
00321
00322
00323 if (!strcmp(u.GetHostFQDN(), fUrl.GetHost()))
00324 if (u.GetPort() == fUrl.GetPort() ||
00325 u.GetPort() == fSocket->GetPort())
00326 if (strlen(u.GetUser()) <= 0 || !strcmp(u.GetUser(),fUrl.GetUser()))
00327 return kTRUE;
00328
00329
00330 return kFALSE;
00331 }
00332
00333
00334 void TXProofMgr::ShowWorkers()
00335 {
00336
00337
00338 if (!IsValid()) {
00339 Warning("ShowWorkers","invalid TXProofMgr - do nothing");
00340 return;
00341 }
00342
00343
00344 TObjString *os = fSocket->SendCoordinator(kQueryWorkers);
00345 if (os) {
00346 TObjArray *oa = TString(os->GetName()).Tokenize(TString("&"));
00347 if (oa) {
00348 TIter nxos(oa);
00349 TObjString *to = 0;
00350 while ((to = (TObjString *) nxos()))
00351
00352 Printf("+ %s", to->GetName());
00353 }
00354 }
00355 }
00356
00357
00358 TList *TXProofMgr::QuerySessions(Option_t *opt)
00359 {
00360
00361
00362 if (opt && !strncasecmp(opt,"L",1))
00363
00364 return fSessions;
00365
00366
00367 if (!IsValid()) {
00368 Warning("QuerySessions","invalid TXProofMgr - do nothing");
00369 return 0;
00370 }
00371
00372
00373 if (!fSessions) {
00374 fSessions = new TList();
00375 fSessions->SetOwner();
00376 }
00377
00378
00379 TList *ocl = new TList;
00380 TObjString *os = fSocket->SendCoordinator(kQuerySessions);
00381 if (os) {
00382 TObjArray *oa = TString(os->GetName()).Tokenize(TString("|"));
00383 if (oa) {
00384 TProofDesc *d = 0;
00385 TIter nxos(oa);
00386 TObjString *to = (TObjString *) nxos();
00387 while ((to = (TObjString *) nxos())) {
00388
00389 char al[256];
00390 char tg[256];
00391 Int_t id = -1, st = -1, nc = 0;
00392 sscanf(to->GetName(),"%d %s %s %d %d", &id, tg, al, &st, &nc);
00393
00394 if (!(d = (TProofDesc *) fSessions->FindObject(tg))) {
00395 Int_t locid = fSessions->GetSize() + 1;
00396 d = new TProofDesc(tg, al, GetUrl(), locid, id, st, 0);
00397 fSessions->Add(d);
00398 } else {
00399
00400 d->SetStatus(st);
00401 d->SetRemoteId(id);
00402 d->SetTitle(al);
00403 }
00404
00405 ocl->Add(new TObjString(tg));
00406 }
00407 SafeDelete(oa);
00408 }
00409 SafeDelete(os);
00410 }
00411
00412
00413 if (fSessions->GetSize() > 0) {
00414 TIter nxd(fSessions);
00415 TProofDesc *d = 0;
00416 while ((d = (TProofDesc *)nxd())) {
00417 if (ocl->FindObject(d->GetName())) {
00418 if (opt && !strncasecmp(opt,"S",1))
00419 d->Print("");
00420 } else {
00421 fSessions->Remove(d);
00422 SafeDelete(d);
00423 }
00424 }
00425 }
00426
00427
00428 return fSessions;
00429 }
00430
00431
00432 Bool_t TXProofMgr::HandleInput(const void *)
00433 {
00434
00435
00436 if (fSocket && fSocket->IsValid()) {
00437 TMessage *mess;
00438 if (fSocket->Recv(mess) >= 0) {
00439 Int_t what = mess->What();
00440 if (gDebug > 0)
00441 Info("HandleInput", "%p: got message type: %d", this, what);
00442 switch (what) {
00443 case kPROOF_TOUCH:
00444 fSocket->RemoteTouch();
00445 break;
00446 default:
00447 Warning("HandleInput", "%p: got unknown message type: %d", this, what);
00448 break;
00449 }
00450 }
00451 } else {
00452 Warning("HandleInput", "%p: got message but socket is invalid!", this);
00453 }
00454
00455
00456 return kTRUE;
00457 }
00458
00459
00460 Bool_t TXProofMgr::HandleError(const void *in)
00461 {
00462
00463
00464 XHandleErr_t *herr = in ? (XHandleErr_t *)in : 0;
00465
00466
00467 if (fSocket && herr && (herr->fOpt == 1)) {
00468 fSocket->Reconnect();
00469 if (fSocket && fSocket->IsValid()) {
00470 if (gDebug > 0)
00471 Printf("ProofMgr: connection to coordinator at %s re-established",
00472 fUrl.GetUrl());
00473 return kFALSE;
00474 }
00475 }
00476 Printf("TXProofMgr::HandleError: %p: got called ...", this);
00477
00478
00479 if (fSessions && fSessions->GetSize() > 0) {
00480 TIter nxd(fSessions);
00481 TProofDesc *d = 0;
00482 while ((d = (TProofDesc *)nxd())) {
00483 TProof *p = (TProof *) d->GetProof();
00484 if (p)
00485 p->InterruptCurrentMonitor();
00486 }
00487 }
00488 if (gDebug > 0)
00489 Printf("TXProofMgr::HandleError: %p: DONE ... ", this);
00490
00491
00492 return kTRUE;
00493 }
00494
00495
00496 Int_t TXProofMgr::Reset(Bool_t hard, const char *usr)
00497 {
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509 if (!IsValid()) {
00510 Warning("Reset","invalid TXProofMgr - do nothing");
00511 return -1;
00512 }
00513
00514 Int_t h = (hard) ? 1 : 0;
00515 fSocket->SendCoordinator(kCleanupSessions, usr, h);
00516
00517 return 0;
00518 }
00519
00520
00521 TProofLog *TXProofMgr::GetSessionLogs(Int_t isess,
00522 const char *stag, const char *pattern)
00523 {
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542 if (!IsValid()) {
00543 Warning("GetSessionLogs","invalid TXProofMgr - do nothing");
00544 return 0;
00545 }
00546
00547 TProofLog *pl = 0;
00548
00549
00550 isess = (isess > 0) ? -isess : isess;
00551
00552
00553 bool retrieve = 1;
00554 TString sesstag(stag);
00555 if (sesstag == "NR") {
00556 retrieve = 0;
00557 sesstag = "";
00558 }
00559
00560
00561 TObjString *os = fSocket->SendCoordinator(kQueryLogPaths, sesstag.Data(), isess);
00562
00563
00564 Int_t ii = 0;
00565 if (os) {
00566 TString rs(os->GetName());
00567 Ssiz_t from = 0;
00568
00569 TString tag;
00570 if (!rs.Tokenize(tag, from, "|")) {
00571 Warning("GetSessionLogs", "Session tag undefined: corruption?\n"
00572 " (received string: %s)", os->GetName());
00573 return (TProofLog *)0;
00574 }
00575
00576 TString purl;
00577 if (!rs.Tokenize(purl, from, "|")) {
00578 Warning("GetSessionLogs", "Pool URL undefined: corruption?\n"
00579 " (received string: %s)", os->GetName());
00580 return (TProofLog *)0;
00581 }
00582
00583 if (!pl)
00584 pl = new TProofLog(tag, GetUrl(), this);
00585
00586
00587 TString to;
00588 while (rs.Tokenize(to, from, "|")) {
00589 if (!to.IsNull()) {
00590 TString ord(to);
00591 ord.Strip(TString::kLeading, ' ');
00592 TString url(ord);
00593 if ((ii = ord.Index(" ")) != kNPOS)
00594 ord.Remove(ii);
00595 if ((ii = url.Index(" ")) != kNPOS)
00596 url.Remove(0, ii + 1);
00597
00598 if (url.Contains(".valgrind")) ord += "-valgrind";
00599 pl->Add(ord, url);
00600
00601 if (gDebug > 1)
00602 Info("GetSessionLogs", "ord: %s, url: %s", ord.Data(), url.Data());
00603 }
00604 }
00605
00606 SafeDelete(os);
00607
00608 if (pl && retrieve) {
00609 if (pattern && strlen(pattern) > 0)
00610 pl->Retrieve("*", TProofLog::kGrep, 0, pattern);
00611 else
00612 pl->Retrieve();
00613 }
00614 }
00615
00616
00617 return pl;
00618 }
00619
00620
00621 TObjString *TXProofMgr::ReadBuffer(const char *fin, Long64_t ofs, Int_t len)
00622 {
00623
00624
00625
00626
00627 if (!IsValid()) {
00628 Warning("ReadBuffer","invalid TXProofMgr - do nothing");
00629 return (TObjString *)0;
00630 }
00631
00632
00633 return fSocket->SendCoordinator(kReadBuffer, fin, len, ofs, 0);
00634 }
00635
00636
00637 TObjString *TXProofMgr::ReadBuffer(const char *fin, const char *pattern)
00638 {
00639
00640
00641
00642
00643 if (!IsValid()) {
00644 Warning("ReadBuffer","invalid TXProofMgr - do nothing");
00645 return (TObjString *)0;
00646 }
00647
00648
00649 Int_t plen = strlen(pattern);
00650 Int_t lfi = strlen(fin);
00651 char *buf = new char[lfi + plen + 1];
00652 memcpy(buf, fin, lfi);
00653 memcpy(buf+lfi, pattern, plen);
00654 buf[lfi+plen] = 0;
00655
00656
00657 return fSocket->SendCoordinator(kReadBuffer, buf, plen, 0, 1);
00658 }
00659
00660
00661 void TXProofMgr::ShowROOTVersions()
00662 {
00663
00664
00665
00666 if (!IsValid()) {
00667 Warning("ShowROOTVersions","invalid TXProofMgr - do nothing");
00668 return;
00669 }
00670
00671
00672 TObjString *os = fSocket->SendCoordinator(kQueryROOTVersions);
00673 if (os) {
00674
00675 Printf("----------------------------------------------------------\n");
00676 Printf("Available versions (tag ROOT-vers remote-path PROOF-version):\n");
00677 Printf("%s", os->GetName());
00678 Printf("----------------------------------------------------------");
00679 SafeDelete(os);
00680 }
00681
00682
00683 return;
00684 }
00685
00686
00687 void TXProofMgr::SetROOTVersion(const char *tag)
00688 {
00689
00690
00691
00692 if (!IsValid()) {
00693 Warning("SetROOTVersion","invalid TXProofMgr - do nothing");
00694 return;
00695 }
00696
00697
00698 fSocket->SendCoordinator(kROOTVersion, tag);
00699
00700
00701 return;
00702 }
00703
00704
00705 Int_t TXProofMgr::SendMsgToUsers(const char *msg, const char *usr)
00706 {
00707
00708
00709
00710
00711
00712
00713
00714 Int_t rc = 0;
00715
00716
00717 if (!msg || strlen(msg) <= 0) {
00718 Error("SendMsgToUsers","no message to send - do nothing");
00719 return -1;
00720 }
00721
00722
00723 const Int_t kMAXBUF = 32768;
00724 char buf[kMAXBUF] = {0};
00725 char *p = &buf[0];
00726 Int_t space = kMAXBUF - 1;
00727 Int_t len = 0;
00728 Int_t lusr = 0;
00729
00730
00731 if (usr && strlen(usr) > 0 && (strlen(usr) != 1 || usr[0] != '*')) {
00732 lusr = (strlen(usr) + 3);
00733 sprintf(buf, "u:%s ", usr);
00734 p += lusr;
00735 space -= lusr;
00736 }
00737
00738
00739 if (!gSystem->AccessPathName(msg, kFileExists)) {
00740
00741 if (gSystem->AccessPathName(msg, kReadPermission)) {
00742 Error("SendMsgToUsers","request to read message from unreadable file '%s'", msg);
00743 return -1;
00744 }
00745
00746 FILE *f = 0;
00747 if (!(f = fopen(msg, "r"))) {
00748 Error("SendMsgToUsers", "file '%s' cannot be open", msg);
00749 return -1;
00750 }
00751
00752 Int_t left = (Int_t) lseek(fileno(f), (off_t) 0, SEEK_END);
00753 lseek(fileno(f), (off_t) 0, SEEK_SET);
00754
00755 Int_t wanted = left;
00756 if (wanted > space) {
00757 wanted = space;
00758 Warning("SendMsgToUsers",
00759 "requested to send %d bytes: max size is %d bytes: truncating", left, space);
00760 }
00761 do {
00762 while ((len = read(fileno(f), p, wanted)) < 0 &&
00763 TSystem::GetErrno() == EINTR)
00764 TSystem::ResetErrno();
00765 if (len < 0) {
00766 SysError("SendMsgToUsers", "error reading file");
00767 break;
00768 }
00769
00770
00771 left -= len;
00772 p += len;
00773 wanted = (left > kMAXBUF-1) ? kMAXBUF-1 : left;
00774
00775 } while (len > 0 && left > 0);
00776 } else {
00777
00778 len = strlen(msg);
00779 if (len > space) {
00780 Warning("SendMsgToUsers",
00781 "requested to send %d bytes: max size is %d bytes: truncating", len, space);
00782 len = space;
00783 }
00784 memcpy(p, msg, len);
00785 }
00786
00787
00788 buf[len + lusr] = 0;
00789
00790
00791 fSocket->SendCoordinator(kSendMsgToUser, buf);
00792
00793 return rc;
00794 }
00795
00796
00797 void TXProofMgr::Grep(const char *what, const char *how, const char *where)
00798 {
00799
00800
00801
00802 if (!IsValid()) {
00803 Warning("Grep","invalid TXProofMgr - do nothing");
00804 return;
00805 }
00806
00807 if (fSocket->GetXrdProofdVersion() < 1006) {
00808 Warning("Grep", "functionality not supported by server");
00809 return;
00810 }
00811
00812
00813 TObjString *os = Exec(kGrep, what, how, where);
00814
00815
00816 if (os) Printf("%s", os->GetName());
00817
00818
00819 SafeDelete(os);
00820 }
00821
00822
00823 void TXProofMgr::Find(const char *what, const char *how, const char *where)
00824 {
00825
00826
00827
00828 if (!IsValid()) {
00829 Warning("Find","invalid TXProofMgr - do nothing");
00830 return;
00831 }
00832
00833 if (fSocket->GetXrdProofdVersion() < 1006) {
00834 Warning("Find", "functionality not supported by server (XrdProofd version: %d)",
00835 fSocket->GetXrdProofdVersion());
00836 return;
00837 }
00838
00839
00840 TObjString *os = Exec(kFind, what, how, where);
00841
00842
00843 if (os) Printf("%s", os->GetName());
00844
00845
00846 SafeDelete(os);
00847 }
00848
00849
00850 void TXProofMgr::Ls(const char *what, const char *how, const char *where)
00851 {
00852
00853
00854
00855 if (!IsValid()) {
00856 Warning("Ls","invalid TXProofMgr - do nothing");
00857 return;
00858 }
00859
00860 if (fSocket->GetXrdProofdVersion() < 1006) {
00861 Warning("Ls", "functionality not supported by server");
00862 return;
00863 }
00864
00865
00866 TObjString *os = Exec(kLs, what, how, where);
00867
00868
00869 if (os) Printf("%s", os->GetName());
00870
00871
00872 SafeDelete(os);
00873 }
00874
00875
00876 void TXProofMgr::More(const char *what, const char *how, const char *where)
00877 {
00878
00879
00880
00881 if (!IsValid()) {
00882 Warning("More","invalid TXProofMgr - do nothing");
00883 return;
00884 }
00885
00886 if (fSocket->GetXrdProofdVersion() < 1006) {
00887 Warning("More", "functionality not supported by server");
00888 return;
00889 }
00890
00891
00892 TObjString *os = Exec(kMore, what, how, where);
00893
00894
00895 if (os) Printf("%s", os->GetName());
00896
00897
00898 SafeDelete(os);
00899 }
00900
00901
00902 Int_t TXProofMgr::Rm(const char *what, const char *how, const char *where)
00903 {
00904
00905
00906
00907
00908
00909 if (!IsValid()) {
00910 Warning("Rm","invalid TXProofMgr - do nothing");
00911 return -1;
00912 }
00913
00914 if (fSocket->GetXrdProofdVersion() < 1006) {
00915 Warning("Rm", "functionality not supported by server");
00916 return -1;
00917 }
00918
00919 TString prompt, ans("Y"), opt(how);
00920 Bool_t force = kFALSE;
00921 if (!opt.IsNull()) {
00922 TString t;
00923 Int_t from = 0;
00924 while (!force && opt.Tokenize(t, from, " ")) {
00925 if (t == "--force") {
00926 force = kTRUE;
00927 } else if (t.BeginsWith("-") && !t.BeginsWith("--") && t.Contains("f")) {
00928 force = kTRUE;
00929 }
00930 }
00931 }
00932
00933 if (!force && isatty(0) != 0 && isatty(1) != 0) {
00934
00935 prompt.Form("Do you really want to remove '%s'? [N/y]", what);
00936 ans = "";
00937 while (ans != "N" && ans != "Y") {
00938 ans = Getline(prompt.Data());
00939 ans.Remove(TString::kTrailing, '\n');
00940 if (ans == "") ans = "N";
00941 ans.ToUpper();
00942 if (ans != "N" && ans != "Y")
00943 Printf("Please answer y, Y, n or N");
00944 }
00945 }
00946
00947 if (ans == "Y") {
00948
00949 TObjString *os = Exec(kRm, what, how, where);
00950
00951 if (os) {
00952 if (gDebug > 1) Printf("%s", os->GetName());
00953
00954 SafeDelete(os);
00955
00956 return 0;
00957 }
00958
00959 return -1;
00960 }
00961
00962 return 0;
00963 }
00964
00965
00966 void TXProofMgr::Tail(const char *what, const char *how, const char *where)
00967 {
00968
00969
00970
00971 if (!IsValid()) {
00972 Warning("Tail","invalid TXProofMgr - do nothing");
00973 return;
00974 }
00975
00976 if (fSocket->GetXrdProofdVersion() < 1006) {
00977 Warning("Tail", "functionality not supported by server");
00978 return;
00979 }
00980
00981
00982 TObjString *os = Exec(kTail, what, how, where);
00983
00984
00985 if (os) Printf("%s", os->GetName());
00986
00987
00988 SafeDelete(os);
00989 }
00990
00991
00992 Int_t TXProofMgr::Md5sum(const char *what, TString &sum, const char *where)
00993 {
00994
00995
00996
00997 if (!IsValid()) {
00998 Warning("Md5sum","invalid TXProofMgr - do nothing");
00999 return -1;
01000 }
01001
01002 if (fSocket->GetXrdProofdVersion() < 1006) {
01003 Warning("Md5sum", "functionality not supported by server");
01004 return -1;
01005 }
01006
01007 if (where && !strcmp(where, "all")) {
01008 Warning("Md5sum","cannot run on all nodes at once: please specify one");
01009 return -1;
01010 }
01011
01012
01013 TObjString *os = Exec(kMd5sum, what, 0, where);
01014
01015
01016 if (os) {
01017 if (gDebug > 1) Printf("%s", os->GetName());
01018 sum = os->GetName();
01019
01020 SafeDelete(os);
01021
01022 return 0;
01023 }
01024
01025 return -1;
01026 }
01027
01028
01029 Int_t TXProofMgr::Stat(const char *what, FileStat_t &st, const char *where)
01030 {
01031
01032
01033
01034 if (!IsValid()) {
01035 Warning("Stat","invalid TXProofMgr - do nothing");
01036 return -1;
01037 }
01038
01039 if (fSocket->GetXrdProofdVersion() < 1006) {
01040 Warning("Stat", "functionality not supported by server");
01041 return -1;
01042 }
01043
01044 if (where && !strcmp(where, "all")) {
01045 Warning("Stat","cannot run on all nodes at once: please specify one");
01046 return -1;
01047 }
01048
01049
01050 TObjString *os = Exec(kStat, what, 0, where);
01051
01052
01053 if (os) {
01054 if (gDebug > 1) Printf("%s", os->GetName());
01055 Int_t mode, uid, gid, islink;
01056 Long_t dev, ino, mtime;
01057 Long64_t size;
01058 #ifdef R__WIN32
01059 sscanf(os->GetName(), "%ld %ld %d %d %d %I64d %ld %d", &dev, &ino, &mode,
01060 &uid, &gid, &size, &mtime, &islink);
01061 #else
01062 sscanf(os->GetName(), "%ld %ld %d %d %d %lld %ld %d", &dev, &ino, &mode,
01063 &uid, &gid, &size, &mtime, &islink);
01064 #endif
01065 if (dev == -1)
01066 return -1;
01067 st.fDev = dev;
01068 st.fIno = ino;
01069 st.fMode = mode;
01070 st.fUid = uid;
01071 st.fGid = gid;
01072 st.fSize = size;
01073 st.fMtime = mtime;
01074 st.fIsLink = (islink == 1);
01075
01076
01077 SafeDelete(os);
01078
01079 return 0;
01080 }
01081
01082 return -1;
01083 }
01084
01085
01086 TObjString *TXProofMgr::Exec(Int_t action,
01087 const char *what, const char *how, const char *where)
01088 {
01089
01090
01091
01092
01093
01094
01095
01096
01097
01098
01099
01100
01101 if (!IsValid()) {
01102 Warning("Exec","invalid TXProofMgr - do nothing");
01103 return (TObjString *)0;
01104 }
01105
01106 if (fSocket->GetXrdProofdVersion() < 1006) {
01107 Warning("Exec", "functionality not supported by server");
01108 return (TObjString *)0;
01109 }
01110
01111 if (!what || strlen(what) <= 0) {
01112 Error("Exec","specifying a path is mandatory");
01113 return (TObjString *)0;
01114 }
01115
01116 TString opt(how);
01117 if (action == kTail && !opt.IsNull()) {
01118
01119 TString opts(how), o;
01120 Int_t from = 0;
01121 Bool_t isc = kFALSE, isn = kFALSE;
01122 while (opts.Tokenize(o, from, " ")) {
01123
01124 if (!o.BeginsWith("-") && !isc && isn) continue;
01125 if (isc) {
01126 opt.Form("-c %s", o.Data());
01127 isc = kFALSE;
01128 }
01129 if (isn) {
01130 opt.Form("-n %s", o.Data());
01131 isn = kFALSE;
01132 }
01133 if (o == "-c") {
01134 isc = kTRUE;
01135 } else if (o == "-n") {
01136 isn = kTRUE;
01137 } else if (o == "--bytes=" || o == "--lines=") {
01138 opt = o;
01139 } else if (o.BeginsWith("-")) {
01140 o.Remove(TString::kLeading,'-');
01141 if (o.IsDigit()) opt.Form("-%s", o.Data());
01142 }
01143 }
01144 }
01145
01146
01147 TString cmd(where);
01148 if (cmd.IsNull()) cmd.Form("%s:%d", fUrl.GetHost(), fUrl.GetPort());
01149 cmd += "|";
01150 cmd += what;
01151 cmd += "|";
01152 cmd += opt;
01153
01154
01155 if (fIntHandler) fIntHandler->Add();
01156
01157
01158 TObjString *os = fSocket->SendCoordinator(kExec, cmd.Data(), action);
01159
01160
01161 if (fIntHandler) fIntHandler->Remove();
01162
01163
01164 return os;
01165 }
01166
01167
01168 Int_t TXProofMgr::GetFile(const char *remote, const char *local, const char *opt)
01169 {
01170
01171
01172
01173
01174
01175
01176
01177 Int_t rc = -1;
01178
01179 if (!IsValid()) {
01180 Warning("GetFile", "invalid TXProofMgr - do nothing");
01181 return rc;
01182 }
01183
01184 if (fSocket->GetXrdProofdVersion() < 1006) {
01185 Warning("GetFile", "functionality not supported by server");
01186 return rc;
01187 }
01188
01189
01190 TString filerem(remote);
01191 if (filerem.IsNull()) {
01192 Error("GetFile", "remote file path undefined");
01193 return rc;
01194 }
01195
01196
01197 TString oo(opt);
01198 oo.ToUpper();
01199 Bool_t force = (oo.Contains("FORCE")) ? kTRUE : kFALSE;
01200 Bool_t silent = (oo.Contains("SILENT")) ? kTRUE : kFALSE;
01201
01202
01203 TString fileloc(local);
01204 if (fileloc.IsNull()) {
01205
01206 fileloc = gSystem->BaseName(filerem);
01207 }
01208 gSystem->ExpandPathName(fileloc);
01209
01210
01211 #ifdef WIN32
01212 UInt_t openflags = O_WRONLY | O_BINARY;
01213 #else
01214 UInt_t openflags = O_WRONLY;
01215 #endif
01216 UInt_t openmode = 0600;
01217
01218
01219 UserGroup_t *ugloc = 0;
01220 Int_t rcloc = 0;
01221 FileStat_t stloc;
01222 if ((rcloc = gSystem->GetPathInfo(fileloc, stloc)) == 0) {
01223 if (R_ISDIR(stloc.fMode)) {
01224
01225 if (!fileloc.EndsWith("/")) fileloc += "/";
01226 fileloc += gSystem->BaseName(filerem);
01227
01228 rcloc = gSystem->GetPathInfo(fileloc, stloc);
01229 }
01230 if (rcloc == 0) {
01231
01232 if (!R_ISREG(stloc.fMode)) {
01233 if (!silent)
01234 Printf("[GetFile] local file '%s' exists and is not regular: cannot continue",
01235 fileloc.Data());
01236 return rc;
01237 }
01238
01239 if (!(ugloc = gSystem->GetUserInfo(gSystem->GetUid()))) {
01240 Error("GetFile", "cannot get user info for additional checks");
01241 return rc;
01242 }
01243
01244 Bool_t owner = (ugloc->fUid == stloc.fUid && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
01245 Bool_t group = (!owner && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
01246 Bool_t other = (!owner && !group) ? kTRUE : kFALSE;
01247 delete ugloc;
01248 if ((owner && !(stloc.fMode & kS_IWUSR)) ||
01249 (group && !(stloc.fMode & kS_IWGRP)) || (other && !(stloc.fMode & kS_IWOTH))) {
01250 if (!silent) {
01251 Printf("[GetFile] file '%s' exists: no permission to delete or overwrite the file", fileloc.Data());
01252 Printf("[GetFile] ownership: owner: %d, group: %d, other: %d", owner, group, other);
01253 Printf("[GetFile] mode: %x", stloc.fMode);
01254 }
01255 return rc;
01256 }
01257
01258 openflags |= O_CREAT | O_TRUNC;
01259 } else {
01260
01261 openflags |= O_CREAT;
01262 }
01263 } else {
01264
01265 openflags |= O_CREAT;
01266 }
01267
01268
01269 TString remsum;
01270 if (Md5sum(filerem, remsum) != 0) {
01271 if (!silent)
01272 Printf("[GetFile] remote file '%s' does not exists or cannot be read", filerem.Data());
01273 return rc;
01274 }
01275
01276
01277 bool same = 0;
01278 if (rcloc == 0 && !force) {
01279 TMD5 *md5loc = TMD5::FileChecksum(fileloc);
01280 if (md5loc) {
01281 if (remsum == md5loc->AsString()) {
01282 if (!silent) {
01283 Printf("[GetFile] local file '%s' and remote file '%s' have the same MD5 check sum",
01284 fileloc.Data(), filerem.Data());
01285 Printf("[GetFile] use option 'force' to override");
01286 }
01287 same = 1;
01288 }
01289 delete md5loc;
01290 }
01291
01292
01293 if (!same) {
01294 char *a = Getline("Local file exists already: would you like to overwrite it? [N/y]");
01295 if (a[0] == 'n' || a[0] == 'N' || a[0] == '\0') return 0;
01296 } else {
01297 return 0;
01298 }
01299 }
01300
01301
01302 Int_t fdout = open(fileloc, openflags, openmode);
01303 if (fdout < 0) {
01304 Error("GetFile", "could not open local file '%s' for writing: errno: %d", local, errno);
01305 return rc;
01306 }
01307
01308
01309 TString cmd(filerem);
01310
01311
01312
01313 gSystem->RemoveFileHandler(TXSocketHandler::GetSocketHandler());
01314
01315
01316 TStopwatch watch;
01317 watch.Start();
01318 TObjString *os = fSocket->SendCoordinator(kGetFile, cmd.Data());
01319
01320 if (os) {
01321
01322 Long64_t size;
01323 sscanf(os->GetName(), "%lld", &size);
01324 if (size <= 0) {
01325 Error("GetFile", "received null or negative size: %lld", size);
01326 close(fdout);
01327 return rc;
01328 }
01329
01330
01331 const Int_t kMAXBUF = 16384;
01332 char buf[kMAXBUF];
01333
01334 rc = 0;
01335 Int_t rec, r;
01336 Long64_t filesize = 0, left = 0;
01337 while (rc == 0 && filesize < size) {
01338 left = size - filesize;
01339 if (left > kMAXBUF) left = kMAXBUF;
01340 rec = fSocket->RecvRaw(&buf, left);
01341 filesize = (rec > 0) ? (filesize + rec) : filesize;
01342 if (rec > 0) {
01343 char *p = buf;
01344 r = rec;
01345 while (r) {
01346 Int_t w = 0;
01347 while ((w = write(fdout, p, r)) < 0 && TSystem::GetErrno() == EINTR)
01348 TSystem::ResetErrno();
01349 if (w < 0) {
01350 SysError("GetFile", "error writing to unit: %d", fdout);
01351 rc = -1;
01352 break;
01353 }
01354 r -= w;
01355 p += w;
01356 }
01357
01358 CpProgress("GetFile", filesize, size, &watch);
01359 } else if (rec < 0) {
01360 rc = -1;
01361 Error("GetFile", "error during receiving file");
01362 break;
01363 }
01364 }
01365
01366 CpProgress("GetFile", filesize, size, &watch, kTRUE);
01367
01368 } else {
01369 Error("GetFile", "size not received");
01370 rc = -1;
01371 }
01372
01373
01374 gSystem->AddFileHandler(TXSocketHandler::GetSocketHandler());
01375
01376
01377 close(fdout);
01378 watch.Stop();
01379 watch.Reset();
01380
01381 if (rc == 0) {
01382
01383 TMD5 *md5loc = TMD5::FileChecksum(fileloc);
01384 if (!md5loc) {
01385 Error("GetFile", "cannot get MD5 checksum of the new local file '%s'", fileloc.Data());
01386 rc = -1;
01387 } else if (remsum != md5loc->AsString()) {
01388 Error("GetFile", "checksums for the local copy and the remote file differ: {rem:%s,loc:%s}",
01389 remsum.Data(), md5loc->AsString());
01390 rc = -1;
01391 delete md5loc;
01392 }
01393 }
01394
01395 return rc;
01396 }
01397
01398
01399 Int_t TXProofMgr::PutFile(const char *local, const char *remote, const char *opt)
01400 {
01401
01402
01403
01404
01405
01406 Int_t rc = -1;
01407
01408 if (!IsValid()) {
01409 Warning("PutFile", "invalid TXProofMgr - do nothing");
01410 return rc;
01411 }
01412
01413 if (fSocket->GetXrdProofdVersion() < 1006) {
01414 Warning("PutFile", "functionality not supported by server");
01415 return rc;
01416 }
01417
01418
01419 TString fileloc(local);
01420 if (fileloc.IsNull()) {
01421 Error("PutFile", "local file path undefined");
01422 return rc;
01423 }
01424 gSystem->ExpandPathName(fileloc);
01425
01426
01427 TString oo(opt);
01428 oo.ToUpper();
01429 Bool_t force = (oo == "FORCE") ? kTRUE : kFALSE;
01430
01431
01432 TString filerem(remote);
01433 if (filerem.IsNull()) {
01434
01435 filerem.Form("~/%s", gSystem->BaseName(fileloc));
01436 } else if (filerem.EndsWith("/")) {
01437
01438 filerem += gSystem->BaseName(fileloc);
01439 }
01440
01441
01442 #ifdef WIN32
01443 UInt_t openflags = O_RDONLY | O_BINARY;
01444 #else
01445 UInt_t openflags = O_RDONLY;
01446 #endif
01447
01448
01449 Int_t rcloc = 0;
01450 FileStat_t stloc;
01451 if ((rcloc = gSystem->GetPathInfo(fileloc, stloc)) != 0 || !R_ISREG(stloc.fMode)) {
01452
01453 const char *why = (rcloc == 0) ? "is not regular" : "does not exists";
01454 Printf("[PutFile] local file '%s' %s: cannot continue", fileloc.Data(), why);
01455 return rc;
01456 }
01457
01458 UserGroup_t *ugloc = 0;
01459 if (!(ugloc = gSystem->GetUserInfo(gSystem->GetUid()))) {
01460 Error("PutFile", "cannot get user info for additional checks");
01461 return rc;
01462 }
01463
01464 Bool_t owner = (ugloc->fUid == stloc.fUid && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
01465 Bool_t group = (!owner && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
01466 Bool_t other = (!owner && !group) ? kTRUE : kFALSE;
01467 delete ugloc;
01468 if ((owner && !(stloc.fMode & kS_IRUSR)) ||
01469 (group && !(stloc.fMode & kS_IRGRP)) || (other && !(stloc.fMode & kS_IROTH))) {
01470 Printf("[PutFile] file '%s': no permission to read the file", fileloc.Data());
01471 Printf("[PutFile] ownership: owner: %d, group: %d, other: %d", owner, group, other);
01472 Printf("[PutFile] mode: %x", stloc.fMode);
01473 return rc;
01474 }
01475
01476
01477 TString locsum;
01478 TMD5 *md5loc = TMD5::FileChecksum(fileloc);
01479 if (!md5loc) {
01480 Error("PutFile", "cannot calculate the check sum for '%s'", fileloc.Data());
01481 return rc;
01482 } else {
01483 locsum = md5loc->AsString();
01484 delete md5loc;
01485 }
01486
01487
01488 Bool_t same = kFALSE;
01489 FileStat_t strem;
01490 TString remsum;
01491 if (Stat(filerem, strem) == 0) {
01492 if (Md5sum(filerem, remsum) != 0) {
01493 Printf("[PutFile] remote file exists but the check sum calculation failed");
01494 return rc;
01495 }
01496
01497 if (remsum == locsum) {
01498 if (!force) {
01499 Printf("[PutFile] local file '%s' and remote file '%s' have the same MD5 check sum",
01500 fileloc.Data(), filerem.Data());
01501 Printf("[PutFile] use option 'force' to override");
01502 }
01503 same = kTRUE;
01504 }
01505 if (!force) {
01506
01507 if (!same) {
01508 char *a = Getline("Remote file exists already: would you like to overwrite it? [N/y]");
01509 if (a[0] == 'n' || a[0] == 'N' || a[0] == '\0') return 0;
01510 force = kTRUE;
01511 } else {
01512 return 0;
01513 }
01514 }
01515 }
01516
01517
01518 int fd = open(fileloc.Data(), openflags);
01519 if (fd < 0) {
01520 Error("PutFile", "cannot open file '%s': %d", fileloc.Data(), errno);
01521 return -1;
01522 }
01523
01524
01525 TString cmd;
01526 cmd.Form("%s %lld", filerem.Data(), stloc.fSize);
01527 if (force) cmd += " force";
01528
01529
01530
01531 gSystem->RemoveFileHandler(TXSocketHandler::GetSocketHandler());
01532
01533
01534 TStopwatch watch;
01535 watch.Start();
01536 TObjString *os = fSocket->SendCoordinator(kPutFile, cmd.Data());
01537
01538 if (os) {
01539
01540
01541 const Int_t kMAXBUF = 16384;
01542 char buf[kMAXBUF];
01543
01544 Long64_t pos = 0;
01545 lseek(fd, pos, SEEK_SET);
01546
01547 rc = 0;
01548 while (rc == 0 && pos < stloc.fSize) {
01549 Long64_t left = stloc.fSize - pos;
01550 if (left > kMAXBUF) left = kMAXBUF;
01551 Int_t siz;
01552 while ((siz = read(fd, &buf[0], left)) < 0 && TSystem::GetErrno() == EINTR)
01553 TSystem::ResetErrno();
01554 if (siz < 0 || siz != left) {
01555 Error("PutFile", "error reading from file: errno: %d", errno);
01556 rc = -1;
01557 break;
01558 }
01559 Int_t src = 0;
01560 if ((src = fSocket->fConn->WriteRaw((void *)&buf[0], left)) != left) {
01561 Error("PutFile", "error sending over: errno: %d (rc: %d)", TSystem::GetErrno(), src);
01562 rc = -1;
01563 break;
01564 }
01565
01566 CpProgress("PutFile", pos, stloc.fSize, &watch);
01567
01568 pos += left;
01569 }
01570
01571 CpProgress("PutFile", pos, stloc.fSize, &watch, kTRUE);
01572
01573 } else {
01574 Error("PutFile", "command could not be executed");
01575 rc = -1;
01576 }
01577
01578
01579 gSystem->AddFileHandler(TXSocketHandler::GetSocketHandler());
01580
01581
01582 close(fd);
01583 watch.Stop();
01584 watch.Reset();
01585
01586 if (rc == 0) {
01587
01588 if (Md5sum(filerem, remsum) != 0) {
01589 Printf("[PutFile] cannot get MD5 checksum of the new remote file '%s'", filerem.Data());
01590 rc = -1;
01591 } else if (remsum != locsum) {
01592 Printf("[PutFile] checksums for the local copy and the remote file differ: {rem:%s, loc:%s}",
01593 remsum.Data(), locsum.Data());
01594 rc = -1;
01595 }
01596 }
01597
01598
01599 return rc;
01600 }
01601
01602
01603 void TXProofMgr::CpProgress(const char *pfx, Long64_t bytes,
01604 Long64_t size, TStopwatch *watch, Bool_t cr)
01605 {
01606
01607
01608
01609 if (!pfx || size == 0 || !watch) return;
01610
01611 fprintf(stderr, "[%s] Total %.02f MB\t|", pfx, (Double_t)size/1048576);
01612
01613 for (int l = 0; l < 20; l++) {
01614 if (size > 0) {
01615 if (l < 20*bytes/size)
01616 fprintf(stderr, "=");
01617 else if (l == 20*bytes/size)
01618 fprintf(stderr, ">");
01619 else if (l > 20*bytes/size)
01620 fprintf(stderr, ".");
01621 } else
01622 fprintf(stderr, "=");
01623 }
01624
01625 gSystem->ProcessEvents();
01626 watch->Stop();
01627 Double_t copytime = watch->RealTime();
01628 fprintf(stderr, "| %.02f %% [%.01f MB/s]\r",
01629 100.0*(size?(bytes/size):1), bytes/copytime/1048576.);
01630 if (cr) fprintf(stderr, "\n");
01631 watch->Continue();
01632 }
01633
01634
01635 Int_t TXProofMgr::Cp(const char *src, const char *dst, const char *fmt)
01636 {
01637
01638
01639
01640
01641 Int_t rc = -1;
01642
01643 if (!IsValid()) {
01644 Warning("Cp", "invalid TXProofMgr - do nothing");
01645 return rc;
01646 }
01647
01648 if (fSocket->GetXrdProofdVersion() < 1006) {
01649 Warning("Cp", "functionality not supported by server");
01650 return rc;
01651 }
01652
01653
01654 TString filesrc(src);
01655 if (filesrc.IsNull()) {
01656 Error("Cp", "source file path undefined");
01657 return rc;
01658 }
01659
01660 TString filedst(dst);
01661 if (filedst.IsNull()) {
01662 filedst = gSystem->BaseName(TUrl(filesrc.Data()).GetFile());
01663 } else if (filedst.EndsWith("/")) {
01664
01665 filedst += gSystem->BaseName(filesrc);
01666 }
01667
01668
01669
01670 TUrl usrc = TUrl(filesrc.Data(), kTRUE).GetUrl();
01671 filesrc = usrc.GetUrl();
01672 if (!strcmp(usrc.GetProtocol(), "file"))
01673 filesrc.Form("file://host/%s", usrc.GetFileAndOptions());
01674 TUrl udst = TUrl(filedst.Data(), kTRUE).GetUrl();
01675 filedst = udst.GetUrl();
01676 if (!strcmp(udst.GetProtocol(), "file"))
01677 filedst.Form("file://host/%s", udst.GetFileAndOptions());
01678
01679
01680 TString cmd;
01681 cmd.Form("%s %s %s", filesrc.Data(), filedst.Data(), (fmt ? fmt : ""));
01682
01683
01684 if (fIntHandler) fIntHandler->Add();
01685
01686
01687 TObjString *os = fSocket->SendCoordinator(kCpFile, cmd.Data());
01688
01689
01690 if (fIntHandler) fIntHandler->Remove();
01691
01692
01693 if (os) {
01694 if (gDebug > 0) Printf("%s", os->GetName());
01695 rc = 0;
01696 }
01697
01698
01699 return rc;
01700 }