00001
00002
00003
00004
00005
00006 #include "Bytes.h"
00007 #include "Getline.h"
00008 #include "TEnv.h"
00009 #include "TProof.h"
00010 #include "TSocket.h"
00011 #include "TString.h"
00012 #include "TSystem.h"
00013
00014
00015 Int_t getXrootdPid(Int_t port, const char *subdir = "xpdtut");
00016 Int_t checkXrootdAt(Int_t port, const char *host = "localhost");
00017 Int_t checkXproofdAt(Int_t port, const char *host = "localhost");
00018 Int_t startXrootdAt(Int_t port, const char *exportdirs = 0, Bool_t force = kFALSE);
00019 Int_t killXrootdAt(Int_t port, const char *id = 0);
00020
00021
00022
00023 typedef struct {
00024 int first;
00025 int second;
00026 int third;
00027 int fourth;
00028 int fifth;
00029 } clnt_HS_t;
00030
00031 typedef struct {
00032 int msglen;
00033 int protover;
00034 int msgval;
00035 } srv_HS_t;
00036
00037
00038 const char *refloc = "proof://localhost:40000";
00039
00040 TProof *getProof(const char *url = "proof://localhost:40000", Int_t nwrks = -1, const char *dir = 0,
00041 const char *opt = "ask", Bool_t dyn = kFALSE, Bool_t tutords = kFALSE)
00042 {
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071 #ifdef __CINT__
00072 Printf("getProof: this script can only be executed via ACliC:");
00073 Printf("getProof: root [] .x <path>/getProof.C+");
00074 Printf("getProof: or root [] .L <path>/getProof.C+");
00075 Printf("getProof: root [] getProof(...)");
00076 return;
00077 #endif
00078
00079 TProof *p = 0;
00080
00081
00082 TString vopt, vopts;
00083 #ifndef WIN32
00084 if (gSystem->Getenv("GETPROOF_VALGRIND")) {
00085 TString s(gSystem->Getenv("GETPROOF_VALGRIND")), t;
00086 Int_t from = 0;
00087 while (s.Tokenize(t, from , " ")) {
00088 if (t.BeginsWith("valgrind_opts:"))
00089 vopts = t;
00090 else
00091 vopt = t;
00092 }
00093 if (vopts.IsNull()) vopts = "valgrind_opts:--leak-check=full --track-origins=yes";
00094 TProof::AddEnvVar("PROOF_WRAPPERCMD", vopts.Data());
00095 Printf("getProof: valgrind run: '%s' (opts: '%s')", vopt.Data(), vopts.Data());
00096 }
00097 #endif
00098
00099
00100 TUrl uu(url), uref(refloc);
00101 Bool_t ext = (strcmp(uu.GetHost(), uref.GetHost()) ||
00102 (uu.GetPort() != uref.GetPort())) ? kTRUE : kFALSE;
00103 if (ext && url && strlen(url) > 0) {
00104 if (!strcmp(url, "lite")) {
00105 if (dir && strlen(dir) > 0) gEnv->SetValue("Proof.Sandbox", dir);
00106 if (nwrks > 0) uu.SetOptions(Form("workers=%d", nwrks));
00107 }
00108 p = TProof::Open(uu.GetUrl(), vopt);
00109 if (p && p->IsValid()) {
00110
00111 if (ext && nwrks > 0) {
00112 Printf("getProof: WARNING: started/attached a session on external cluster (%s):"
00113 " 'nwrks=%d' ignored", url, nwrks);
00114 }
00115 if (ext && dir && strlen(dir) > 0) {
00116 Printf("getProof: WARNING: started/attached a session on external cluster (%s):"
00117 " 'dir=\"%s\"' ignored", url, dir);
00118 }
00119 if (ext && !strcmp(opt,"force")) {
00120 Printf("getProof: WARNING: started/attached a session on external cluster (%s):"
00121 " 'opt=\"force\"' ignored", url);
00122 }
00123 if (ext && dyn) {
00124 Printf("getProof: WARNING: started/attached a session on external cluster (%s):"
00125 " 'dyn=kTRUE' ignored", url);
00126 }
00127
00128 return p;
00129 } else {
00130 if (ext)
00131 Printf("getProof: could not get/start a valid session at %s - try local", url);
00132 }
00133 if (p) delete p;
00134 p = 0;
00135 }
00136
00137 #ifdef WIN32
00138
00139 Printf("getProof: local PROOF not yet supported on Windows, sorry!");
00140 return p;
00141 #else
00142
00143
00144 TString tutdir = dir;
00145 if (tutdir.IsNull() || gSystem->AccessPathName(dir, kWritePermission)) {
00146 Printf("getProof: tutorial dir missing or not writable - try temp ");
00147
00148 tutdir="/tmp";
00149 if (gSystem->AccessPathName(tutdir, kWritePermission)) tutdir = gSystem->TempDirectory();
00150 TString us;
00151 UserGroup_t *ug = gSystem->GetUserInfo(gSystem->GetUid());
00152 if (!ug) {
00153 Printf("getProof: could not get user info");
00154 return p;
00155 }
00156 us.Form("/%s", ug->fUser.Data());
00157 if (!tutdir.EndsWith(us.Data())) tutdir += us;
00158 gSystem->mkdir(tutdir.Data(), kTRUE);
00159 if (gSystem->AccessPathName(tutdir, kWritePermission)) {
00160 Printf("getProof: unable to get a writable tutorial directory (tried: %s)"
00161 " - cannot continue", tutdir.Data());
00162 return p;
00163 }
00164 }
00165 Printf("getProof: tutorial dir: %s", tutdir.Data());
00166
00167
00168 TString datasetdir;
00169 if (tutords) {
00170 datasetdir = Form("%s/dataset", tutdir.Data());
00171 if (gSystem->AccessPathName(datasetdir, kWritePermission)) {
00172 gSystem->mkdir(datasetdir, kTRUE);
00173 if (gSystem->AccessPathName(datasetdir, kWritePermission)) {
00174 Printf("getProof: unable to get a writable dataset directory (tried: %s)"
00175 " - cannot continue", datasetdir.Data());
00176 return p;
00177 }
00178 Printf("getProof: dataset dir: %s", datasetdir.Data());
00179 }
00180 }
00181
00182
00183 TUrl u(refloc);
00184 u.SetProtocol("proof");
00185 if (!strcmp(uu.GetHost(), uref.GetHost()) && (uu.GetPort() != uref.GetPort()))
00186 u.SetPort(uu.GetPort());
00187 Int_t lportp = u.GetPort();
00188 Int_t lportx = lportp + 1;
00189 TString lurl = u.GetUrl();
00190
00191
00192 TString workarea = Form("%s/proof", tutdir.Data());
00193 TString xpdcf(Form("%s/xpd.cf",tutdir.Data()));
00194 TString xpdlog(Form("%s/xpd.log",tutdir.Data()));
00195 TString xpdlogprt(Form("%s/xpdtut/xpd.log",tutdir.Data()));
00196 TString xpdpid(Form("%s/xpd.pid",tutdir.Data()));
00197 TString proofsessions(Form("%s/sessions",tutdir.Data()));
00198 TString cmd;
00199 Int_t rc = 0;
00200
00201
00202 Int_t pid = -1;
00203 Bool_t restart = kTRUE;
00204 if ((rc = checkXproofdAt(lportp)) == 1) {
00205 Printf("getProof: something else the a XProofd service is running on"
00206 " port %d - cannot continue", lportp);
00207 return p;
00208
00209 } else if (rc == 0) {
00210
00211 restart = kFALSE;
00212
00213 pid = getXrootdPid(lportx);
00214 Printf("getProof: daemon found listening on dedicated ports {%d,%d} (pid: %d)",
00215 lportx, lportp, pid);
00216 if (isatty(0) == 0 || isatty(1) == 0) {
00217
00218 restart = kTRUE;
00219 } else {
00220 if (!strcmp(opt,"ask")) {
00221 char *answer = Getline("getProof: would you like to restart it (N,Y)? [N] ");
00222 if (answer && (answer[0] == 'Y' || answer[0] == 'y'))
00223 restart = kTRUE;
00224 }
00225 }
00226 if (!strcmp(opt,"force"))
00227
00228 restart = kTRUE;
00229
00230
00231 if (restart) {
00232
00233 Printf("getProof: cleaning existing instance ...");
00234
00235 cmd = Form("kill -9 %d", pid);
00236 if ((rc = gSystem->Exec(cmd)) != 0)
00237 Printf("getProof: problems stopping xrootd process %d (%d)", pid, rc);
00238
00239 Printf("getProof: wait 5 secs so that previous connections are cleaned ...");
00240 gSystem->Sleep(5000);
00241 }
00242 }
00243
00244 if (restart) {
00245
00246 char *xrootd = gSystem->Which(gSystem->Getenv("PATH"), "xrootd", kExecutePermission);
00247 if (!xrootd) {
00248 Printf("getProof: xrootd not found: please check the environment!");
00249 return p;
00250 }
00251
00252
00253 cmd = Form("rm -fr %s/*", tutdir.Data());
00254 gSystem->Exec(cmd);
00255
00256
00257 FILE *fcf = fopen(xpdcf.Data(), "w");
00258 if (!fcf) {
00259 Printf("getProof: could not create config file for XPD (%s)", xpdcf.Data());
00260 return p;
00261 }
00262 fprintf(fcf,"### Use admin path at %s/admin to avoid interferences with other users\n", tutdir.Data());
00263 fprintf(fcf,"xrd.adminpath %s/admin\n", tutdir.Data());
00264 #if defined(R__MACOSX)
00265 fprintf(fcf,"### Use dedicated socket path under /tmp to avoid length problems\n");
00266 fprintf(fcf,"xpd.sockpathdir /tmp/xpd-sock\n");
00267 #endif
00268 fprintf(fcf,"### Run data serving on port %d\n", lportp+1);
00269 fprintf(fcf,"xrd.port %d\n", lportp+1);
00270 fprintf(fcf,"### Load the XrdProofd protocol on port %d\n", lportp);
00271 fprintf(fcf,"xrd.protocol xproofd libXrdProofd.so\n");
00272 fprintf(fcf,"xpd.port %d\n", lportp);
00273 if (nwrks > 0) {
00274 fprintf(fcf,"### Force number of local workers\n");
00275 fprintf(fcf,"xpd.localwrks %d\n", nwrks);
00276 }
00277 fprintf(fcf,"### Root path for working dir\n");
00278 fprintf(fcf,"xpd.workdir %s\n", workarea.Data());
00279 fprintf(fcf,"### Allow different users to connect\n");
00280 fprintf(fcf,"xpd.multiuser 1\n");
00281 fprintf(fcf,"### Limit the number of query results kept in the master sandbox\n");
00282 fprintf(fcf,"xpd.putrc ProofServ.UserQuotas: maxquerykept=10\n");
00283 if (tutords) {
00284 fprintf(fcf,"### Use dataset directory under the tutorial dir\n");
00285 fprintf(fcf,"xpd.datasetsrc file url:%s opt:-Cq:Av:As:\n", datasetdir.Data());
00286 }
00287 if (dyn) {
00288 fprintf(fcf,"### Use dynamic, per-job scheduling\n");
00289 fprintf(fcf,"xpd.putrc Proof.DynamicStartup 1\n");
00290 }
00291 fprintf(fcf,"### Local data server for the temporary output files\n");
00292 fprintf(fcf,"xpd.putenv LOCALDATASERVER=root://%s:%d\n", gSystem->HostName(), lportx);
00293 fclose(fcf);
00294 Printf("getProof: xrootd config file at %s", xpdcf.Data());
00295
00296
00297 Printf("getProof: xrootd log file at %s", xpdlogprt.Data());
00298 cmd = Form("%s -c %s -b -l %s -n xpdtut -p %d",
00299 xrootd, xpdcf.Data(), xpdlog.Data(), lportx);
00300 Printf("(NB: any error line from XrdClientSock::RecvRaw and XrdClientMessage::ReadRaw should be ignored)");
00301 if ((rc = gSystem->Exec(cmd)) != 0) {
00302 Printf("getProof: problems starting xrootd (%d)", rc);
00303 return p;
00304 }
00305 delete[] xrootd;
00306
00307
00308 Printf("getProof: waiting for xrootd to start ...");
00309 gSystem->Sleep(2000);
00310
00311 pid = getXrootdPid(lportx);
00312 Printf("getProof: xrootd pid: %d", pid);
00313
00314
00315 FILE *fpid = fopen(xpdpid.Data(), "w");
00316 if (!fpid) {
00317 Printf("getProof: could not create pid file for XPD");
00318 } else {
00319 fprintf(fpid,"%d\n", pid);
00320 fclose(fpid);
00321 }
00322 }
00323 Printf("getProof: start / attach the PROOF session ...");
00324
00325
00326 p = TProof::Open(lurl, vopt.Data());
00327 if (!p || !(p->IsValid())) {
00328 Printf("getProof: starting local session failed");
00329 if (p) delete p;
00330 p = 0;
00331 return p;
00332 }
00333
00334
00335 return p;
00336 #endif
00337 }
00338
00339 Int_t getXrootdPid(Int_t port, const char *subdir)
00340 {
00341 #ifdef WIN32
00342
00343 Printf("getXrootdPid: Xrootd/Proof not supported on Windows, sorry!");
00344 return -1;
00345 #else
00346
00347 Int_t pid = -1;
00348 #if defined(__sun)
00349 const char *com = "-eo pid,comm";
00350 #elif defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__APPLE__)
00351 const char *com = "ax -w -w";
00352 #else
00353 const char *com = "-w -w -eo pid,command";
00354 #endif
00355 TString cmd;
00356 if (subdir && strlen(subdir) > 0) {
00357 cmd.Form("ps %s | grep xrootd | grep \"\\-p %d\" | grep %s", com, port, subdir);
00358 } else {
00359 cmd.Form("ps %s | grep xrootd | grep \"\\-p %d\"", com, port);
00360 }
00361 FILE *fp = gSystem->OpenPipe(cmd.Data(), "r");
00362 if (fp) {
00363 char line[2048], rest[2048];
00364 while (fgets(line, sizeof(line), fp)) {
00365 sscanf(line,"%d %s", &pid, rest);
00366 break;
00367 }
00368 gSystem->ClosePipe(fp);
00369 }
00370
00371 return pid;
00372 #endif
00373 }
00374
00375 Int_t checkXrootdAt(Int_t port, const char *host)
00376 {
00377
00378
00379
00380
00381
00382
00383
00384 TSocket s(host, port);
00385 if (!(s.IsValid())) {
00386 if (gDebug > 0)
00387 Printf("checkXrootdAt: could not open connection to %s:%d", host, port);
00388 return -1;
00389 }
00390
00391 clnt_HS_t initHS;
00392 memset(&initHS, 0, sizeof(initHS));
00393 initHS.fourth = host2net((int)4);
00394 initHS.fifth = host2net((int)2012);
00395 int len = sizeof(initHS);
00396 s.SendRaw(&initHS, len);
00397
00398 int type;
00399 len = sizeof(type);
00400 int readCount = s.RecvRaw(&type, len);
00401 if (readCount != len) {
00402 if (gDebug > 0)
00403 Printf("checkXrootdAt: 1st: wrong number of bytes read: %d (expected: %d)",
00404 readCount, len);
00405 return 1;
00406 }
00407
00408 type = net2host(type);
00409
00410 if (type == 0) {
00411 srv_HS_t xbody;
00412 len = sizeof(xbody);
00413 readCount = s.RecvRaw(&xbody, len);
00414 if (readCount != len) {
00415 if (gDebug > 0)
00416 Printf("checkXrootdAt: 2nd: wrong number of bytes read: %d (expected: %d)",
00417 readCount, len);
00418 return 1;
00419 }
00420
00421 } else if (type == 8) {
00422
00423 if (gDebug > 0)
00424 Printf("checkXrootdAt: server is ROOTD");
00425 return 1;
00426 } else {
00427
00428 if (gDebug > 0)
00429 Printf("checkXrootdAt: unknown server type: %d", type);
00430 return 1;
00431 }
00432
00433 return 0;
00434 }
00435
00436 Int_t checkXproofdAt(Int_t port, const char *host)
00437 {
00438
00439
00440
00441
00442
00443
00444
00445 TSocket s(host, port);
00446 if (!(s.IsValid())) {
00447 if (gDebug > 0)
00448 Printf("checkXproofdAt: could not open connection to %s:%d", host, port);
00449 return -1;
00450 }
00451
00452 clnt_HS_t initHS;
00453 memset(&initHS, 0, sizeof(initHS));
00454 initHS.third = (int)host2net((int)1);
00455 int len = sizeof(initHS);
00456 s.SendRaw(&initHS, len);
00457
00458 int dum[2];
00459 dum[0] = (int)host2net((int)4);
00460 dum[1] = (int)host2net((int)2012);
00461 s.SendRaw(&dum[0], sizeof(dum));
00462
00463 int type;
00464 len = sizeof(type);
00465 int readCount = s.RecvRaw(&type, len);
00466 if (readCount != len) {
00467 if (gDebug > 0)
00468 Printf("checkXproofdAt: 1st: wrong number of bytes read: %d (expected: %d)",
00469 readCount, len);
00470 return 1;
00471 }
00472
00473 type = net2host(type);
00474
00475 if (type == 0) {
00476 srv_HS_t xbody;
00477 len = sizeof(xbody);
00478 readCount = s.RecvRaw(&xbody, len);
00479 if (readCount != len) {
00480 if (gDebug > 0)
00481 Printf("checkXproofdAt: 2nd: wrong number of bytes read: %d (expected: %d)",
00482 readCount, len);
00483 return 1;
00484 }
00485 xbody.protover = net2host(xbody.protover);
00486 xbody.msgval = net2host(xbody.msglen);
00487 xbody.msglen = net2host(xbody.msgval);
00488
00489 } else if (type == 8) {
00490
00491 if (gDebug > 0)
00492 Printf("checkXproofdAt: server is PROOFD");
00493 return 1;
00494 } else {
00495
00496 if (gDebug > 0)
00497 Printf("checkXproofdAt: unknown server type: %d", type);
00498 return 1;
00499 }
00500
00501 return 0;
00502 }
00503
00504 Int_t startXrootdAt(Int_t port, const char *exportdirs, Bool_t force)
00505 {
00506
00507
00508
00509 #ifdef WIN32
00510
00511 Printf("startXrootdAt: Xrootd not supported on Windows, sorry!");
00512 return -1;
00513 #else
00514 Bool_t restart = kTRUE;
00515
00516
00517 Int_t rc = 0;
00518 if ((rc = checkXrootdAt(port)) == 1) {
00519
00520 Printf("startXrootdAt: some other service running on port %d - cannot proceed ", port);
00521 return -1;
00522
00523 } else if (rc == 0) {
00524
00525 restart = kFALSE;
00526
00527 if (force) {
00528
00529 restart = kTRUE;
00530 } else {
00531 Printf("startXrootdAt: xrootd service already available on port %d: ", port);
00532 char *answer = Getline("startXrootdAt: would you like to restart it (N,Y)? [N] ");
00533 if (answer && (answer[0] == 'Y' || answer[0] == 'y')) {
00534 restart = kTRUE;
00535 }
00536 }
00537
00538
00539 if (restart) {
00540 Printf("startXrootdAt: cleaning existing instance ...");
00541
00542
00543 Int_t pid = getXrootdPid(port, "xrd-basic");
00544
00545
00546 TString cmd = Form("kill -9 %d", pid);
00547 if ((rc = gSystem->Exec(cmd)) != 0)
00548 Printf("startXrootdAt: problems stopping xrootd process %d (%d)", pid, rc);
00549 }
00550 }
00551
00552 if (restart) {
00553 if (gSystem->AccessPathName("/tmp/xrd-basic")) {
00554 gSystem->mkdir("/tmp/xrd-basic");
00555 if (gSystem->AccessPathName("/tmp/xrd-basic")) {
00556 Printf("startXrootdAt: could not assert dir for log file");
00557 return -1;
00558 }
00559 }
00560 TString cmd;
00561 cmd.Form("xrootd -d -p %d -b -l /tmp/xrd-basic/xrootd.log", port);
00562 if (exportdirs && strlen(exportdirs) > 0) {
00563 TString dirs(exportdirs), d;
00564 Int_t from = 0;
00565 while (dirs.Tokenize(d, from, " ")) {
00566 if (!d.IsNull()) {
00567 cmd += " ";
00568 cmd += d;
00569 }
00570 }
00571 }
00572 Printf("cmd: %s", cmd.Data());
00573 if ((rc = gSystem->Exec(cmd)) != 0) {
00574 Printf("startXrootdAt: problems executing starting command (%d)", rc);
00575 return -1;
00576 }
00577
00578 Printf("startXrootdAt: waiting for xrootd to start ...");
00579 gSystem->Sleep(2000);
00580
00581 if ((rc = checkXrootdAt(port)) != 0) {
00582 Printf("startXrootdAt: xrootd service not available at %d (rc = %d) - startup failed",
00583 port, rc);
00584 return -1;
00585 }
00586 Printf("startXrootdAt: basic xrootd started!");
00587 }
00588
00589
00590 return 0;
00591 #endif
00592 }
00593
00594 Int_t killXrootdAt(Int_t port, const char *id)
00595 {
00596
00597
00598 #ifdef WIN32
00599
00600 Printf("killXrootdAt: Xrootd not supported on Windows, sorry!");
00601 return -1;
00602 #else
00603
00604 Int_t pid = -1, rc= 0;
00605 if ((pid = getXrootdPid(port, id)) > 0) {
00606
00607
00608 TString cmd = Form("kill -9 %d", pid);
00609 if ((rc = gSystem->Exec(cmd)) != 0)
00610 Printf("killXrootdAt: problems stopping xrootd process %d (%d)", pid, rc);
00611 }
00612
00613
00614 return rc;
00615 #endif
00616 }
00617