00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "TProofOutputFile.h"
00021 #include <TEnv.h>
00022 #include <TFileCollection.h>
00023 #include <TFileInfo.h>
00024 #include <TFileMerger.h>
00025 #include <TFile.h>
00026 #include <TList.h>
00027 #include <TObjArray.h>
00028 #include <TObject.h>
00029 #include <TObjString.h>
00030 #include <TProofServ.h>
00031 #include <TSystem.h>
00032 #include <TUUID.h>
00033
00034 ClassImp(TProofOutputFile)
00035
00036
00037 TProofOutputFile::TProofOutputFile(const char *path,
00038 ERunType type, UInt_t opt, const char *dsname)
00039 : TNamed(path, ""), fRunType(type), fTypeOpt(opt)
00040 {
00041
00042
00043 fIsLocal = kFALSE;
00044 fMerged = kFALSE;
00045 fMerger = 0;
00046 fDataSet = 0;
00047
00048 Init(path, dsname);
00049 }
00050
00051
00052 TProofOutputFile::TProofOutputFile(const char *path,
00053 const char *option, const char *dsname)
00054 : TNamed(path, "")
00055 {
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068 fIsLocal = kFALSE;
00069 fMerged = kFALSE;
00070 fMerger = 0;
00071 fDataSet = 0;
00072
00073
00074 fRunType = kMerge;
00075 fTypeOpt = kRemote;
00076 if (option && strlen(option) > 0) {
00077 TString opt(option);
00078 if (opt.Contains("L") || (opt == "LOCAL")) fTypeOpt = kLocal;
00079 if (!opt.Contains("M") && opt.Contains("D")) {
00080
00081 fRunType = kDataset;
00082 fTypeOpt = kCreate;
00083 if (opt.Contains("R")) fTypeOpt = (ETypeOpt) (fTypeOpt | kRegister);
00084 if (opt.Contains("O")) fTypeOpt = (ETypeOpt) (fTypeOpt | kOverwrite);
00085 if (opt.Contains("V")) fTypeOpt = (ETypeOpt) (fTypeOpt | kVerify);
00086 }
00087 }
00088
00089 Init(path, dsname);
00090 }
00091
00092
00093 void TProofOutputFile::Init(const char *path, const char *dsname)
00094 {
00095
00096
00097 fLocalHost = TUrl(gSystem->HostName()).GetHostFQDN();
00098 Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
00099 if (port > -1) {
00100 fLocalHost += ":";
00101 fLocalHost += port;
00102 }
00103
00104 TUrl u(path, kTRUE);
00105
00106 fFileName = u.GetFile();
00107
00108 SetName(gSystem->BaseName(fFileName.Data()));
00109 if (dsname && strlen(dsname) > 0) {
00110
00111 SetTitle(dsname);
00112 } else {
00113
00114 SetTitle(GetName());
00115 }
00116
00117 if (u.GetOptions() && strlen(u.GetOptions()) > 0)
00118 fOptionsAnchor += TString::Format("?%s", u.GetOptions());
00119 if (u.GetAnchor() && strlen(u.GetAnchor()) > 0)
00120 fOptionsAnchor += TString::Format("#%s", u.GetAnchor());
00121
00122 fIsLocal = kFALSE;
00123 fDir = u.GetUrl();
00124 Int_t pos = fDir.Index(fFileName);
00125 if (pos != kNPOS) fDir.Remove(pos);
00126 fRawDir = fDir;
00127
00128 if (fDir == "file:") {
00129 fIsLocal = kTRUE;
00130
00131
00132
00133 TString dirPath = gSystem->DirName(fFileName);
00134 TString dirData = (!IsMerge() && gProofServ) ? gProofServ->GetDataDir()
00135 : gSystem->WorkingDirectory();
00136 if ((dirPath[0] == '/') && !dirPath.BeginsWith(dirData)) {
00137 Warning("Init", "not allowed to create files under '%s' - chrooting to '%s'",
00138 dirPath.Data(), dirData.Data());
00139 dirPath.Insert(0, dirData);
00140 } else if (dirPath.BeginsWith("..")) {
00141 dirPath.Remove(0, 2);
00142 if (dirPath[0] != '/') dirPath.Insert(0, "/");
00143 dirPath.Insert(0, dirData);
00144 } else if (dirPath[0] == '.' || dirPath[0] == '~') {
00145 dirPath.Remove(0, 1);
00146 if (dirPath[0] != '/') dirPath.Insert(0, "/");
00147 dirPath.Insert(0, dirData);
00148 } else if (dirPath.IsNull()) {
00149 dirPath = dirData;
00150 }
00151
00152
00153 if (!IsMerge() && gProofServ) {
00154 if (!dirPath.Contains(gProofServ->GetOrdinal())) {
00155 if (!dirPath.EndsWith("/")) dirPath += "/";
00156 dirPath += gProofServ->GetOrdinal();
00157 }
00158 if (!dirPath.Contains(gProofServ->GetSessionTag())) {
00159 if (!dirPath.EndsWith("/")) dirPath += "/";
00160 dirPath += gProofServ->GetSessionTag();
00161 }
00162 if (!dirPath.Contains("<qnum>")) {
00163 if (!dirPath.EndsWith("/")) dirPath += "/";
00164 dirPath += "<qnum>";
00165 }
00166 }
00167
00168 TProofServ::ResolveKeywords(dirPath, 0);
00169
00170 fRawDir = dirPath;
00171
00172
00173
00174 TString existsPath(dirPath);
00175 TList subPaths;
00176 while (existsPath != "/" && existsPath != "." && gSystem->AccessPathName(existsPath)) {
00177 subPaths.AddFirst(new TObjString(gSystem->BaseName(existsPath)));
00178 existsPath = gSystem->DirName(existsPath);
00179 }
00180 subPaths.SetOwner(kTRUE);
00181 FileStat_t st;
00182 if (gSystem->GetPathInfo(existsPath, st) == 0) {
00183 TString xpath = existsPath;
00184 TIter nxp(&subPaths);
00185 TObjString *os = 0;
00186 while ((os = (TObjString *) nxp())) {
00187 xpath += TString::Format("/%s", os->GetName());
00188 if (gSystem->mkdir(xpath, kTRUE) == 0) {
00189 if (gSystem->Chmod(xpath, (UInt_t) st.fMode) != 0)
00190 Warning("Init", "problems setting mode on '%s'", xpath.Data());
00191 } else {
00192 Error("Init", "problems creating path '%s'", xpath.Data());
00193 }
00194 }
00195 } else {
00196 Warning("Init", "could not get info for path '%s': will only try to create"
00197 " the full path w/o trying to set the mode", existsPath.Data());
00198 if (gSystem->mkdir(existsPath, kTRUE) != 0)
00199 Error("Init", "problems creating path '%s'", existsPath.Data());
00200 }
00201
00202 TString pfx = gEnv->GetValue("Path.Localroot","");
00203 if (!pfx.IsNull() && dirPath.BeginsWith(pfx)) dirPath.Remove(0, pfx.Length());
00204
00205 if (gSystem->Getenv("LOCALDATASERVER")) {
00206 fDir = gSystem->Getenv("LOCALDATASERVER");
00207 if (!fDir.EndsWith("/")) fDir += "/";
00208 }
00209 fDir += dirPath;
00210 } else {
00211
00212 TProofServ::ResolveKeywords(fFileName, 0);
00213 }
00214
00215 Info("Init", "dir: %s (raw: %s)", fDir.Data(), fRawDir.Data());
00216
00217
00218 fOutputFileName = gEnv->GetValue("Proof.OutputFile", "<file>");
00219
00220 TString fileName = path;
00221 if (!fileName.EndsWith(".root")) fileName += ".root";
00222
00223 if (!fOutputFileName.IsNull() && !fOutputFileName.Contains("<file>")) {
00224 if (!fOutputFileName.EndsWith("/")) fOutputFileName += "/";
00225 fOutputFileName += fileName;
00226 }
00227
00228 fileName.ReplaceAll("<ord>","");
00229 TProofServ::ResolveKeywords(fOutputFileName, fileName);
00230 Info("Init", "output file url: %s", fOutputFileName.Data());
00231
00232 fWorkerOrdinal = "<ord>";
00233 TProofServ::ResolveKeywords(fWorkerOrdinal, 0);
00234 }
00235
00236
00237 TProofOutputFile::~TProofOutputFile()
00238 {
00239
00240
00241 if (fDataSet) delete fDataSet;
00242 if (fMerger) delete fMerger;
00243 }
00244
00245
00246 void TProofOutputFile::SetFileName(const char* name)
00247 {
00248
00249
00250 fFileName = name;
00251 }
00252
00253
00254 void TProofOutputFile::SetOutputFileName(const char *name)
00255 {
00256
00257
00258 if (name && strlen(name) > 0) {
00259 fOutputFileName = name;
00260 TProofServ::ResolveKeywords(fOutputFileName);
00261 Info("SetOutputFileName", "output file url: %s", fOutputFileName.Data());
00262 } else {
00263 fOutputFileName = "";
00264 }
00265 }
00266
00267
00268 TFile* TProofOutputFile::OpenFile(const char* opt)
00269 {
00270
00271
00272 if (fFileName.IsNull()) return 0;
00273
00274
00275 TString fileLoc;
00276 fileLoc.Form("%s/%s%s", fRawDir.Data(), fFileName.Data(), fOptionsAnchor.Data());
00277
00278
00279 TFile *retFile = TFile::Open(fileLoc, opt);
00280
00281 return retFile;
00282 }
00283
00284
00285 Int_t TProofOutputFile::AdoptFile(TFile *f)
00286 {
00287
00288
00289
00290 if (!f || f->IsZombie())
00291 return -1;
00292
00293
00294 TUrl u(*(f->GetEndpointUrl()));
00295 fIsLocal = kFALSE;
00296 if (!strcmp(u.GetProtocol(), "file")) {
00297 fIsLocal = kTRUE;
00298 fDir = u.GetFile();
00299 } else {
00300 fDir = u.GetUrl();
00301 }
00302 fFileName = gSystem->BaseName(fDir.Data());
00303 fDir.ReplaceAll(fFileName, "");
00304 fRawDir = fDir;
00305
00306
00307 TString pfx = gEnv->GetValue("Path.Localroot","");
00308 if (!pfx.IsNull()) fDir.ReplaceAll(pfx, "");
00309
00310 if (gSystem->Getenv("LOCALDATASERVER")) {
00311 TString localDS(gSystem->Getenv("LOCALDATASERVER"));
00312 if (!localDS.EndsWith("/")) localDS += "/";
00313 fDir.Insert(0, localDS);
00314 }
00315
00316 return 0;
00317 }
00318
00319
00320 Long64_t TProofOutputFile::Merge(TCollection* list)
00321 {
00322
00323
00324
00325 if(!list || list->IsEmpty()) return 0;
00326
00327 if (IsMerge()) {
00328
00329 TString fileLoc;
00330 TString outputFileLoc = (fOutputFileName.IsNull()) ? fFileName : fOutputFileName;
00331
00332 Bool_t localMerge = (fRunType == kMerge && fTypeOpt == kLocal) ? kTRUE : kFALSE;
00333 TFileMerger *merger = GetFileMerger(localMerge);
00334 if (!merger) {
00335 Error("Merge", "could not instantiate the file merger");
00336 return -1;
00337 }
00338
00339 if (!fMerged) {
00340 merger->OutputFile(outputFileLoc);
00341 fileLoc.Form("%s/%s", fDir.Data(), GetFileName());
00342 AddFile(merger, fileLoc);
00343 fMerged = kTRUE;
00344 }
00345
00346 TIter next(list);
00347 TObject *o = 0;
00348 while((o = next())) {
00349 TProofOutputFile *pFile = dynamic_cast<TProofOutputFile *>(o);
00350 if (pFile) {
00351 fileLoc = Form("%s/%s", pFile->GetDir(), pFile->GetFileName());
00352 AddFile(merger, fileLoc);
00353 }
00354 }
00355 } else {
00356
00357 TUrl mssUrl(gEnv->GetValue("ProofServ.PoolUrl",""));
00358
00359 TFileCollection *dataset = GetFileCollection();
00360 if (!dataset) {
00361 Error("Merge", "could not instantiate the file collection");
00362 return -1;
00363 }
00364 TString path;
00365 TFileInfo *fi = 0;
00366
00367 dataset->Update();
00368 if (dataset->GetNFiles() == 0) {
00369
00370 path.Form("%s/%s%s", GetDir(), GetFileName(), GetOptionsAnchor());
00371 fi = new TFileInfo(path);
00372
00373 if (mssUrl.IsValid()) {
00374 TUrl ur(fi->GetFirstUrl()->GetUrl());
00375 ur.SetProtocol(mssUrl.GetProtocol());
00376 ur.SetHost(mssUrl.GetHost());
00377 ur.SetPort(mssUrl.GetPort());
00378 if (mssUrl.GetUser() && strlen(mssUrl.GetUser()) > 0)
00379 ur.SetUser(mssUrl.GetUser());
00380 fi->AddUrl(ur.GetUrl());
00381 }
00382
00383 path.Form("%s/%s?node=%s", GetDir(kTRUE), GetFileName(), GetLocalHost());
00384 fi->AddUrl(path);
00385 fi->Print();
00386
00387 dataset->Add(fi);
00388 }
00389
00390 TIter next(list);
00391 TObject *o = 0;
00392 while((o = next())) {
00393 TProofOutputFile *pFile = dynamic_cast<TProofOutputFile *>(o);
00394 if (pFile) {
00395
00396 path.Form("%s/%s%s", pFile->GetDir(), pFile->GetFileName(), pFile->GetOptionsAnchor());
00397 fi = new TFileInfo(path);
00398
00399 if (mssUrl.IsValid()) {
00400 TUrl ur(fi->GetFirstUrl()->GetUrl());
00401 ur.SetProtocol(mssUrl.GetProtocol());
00402 ur.SetHost(mssUrl.GetHost());
00403 ur.SetPort(mssUrl.GetPort());
00404 if (mssUrl.GetUser() && strlen(mssUrl.GetUser()) > 0)
00405 ur.SetUser(mssUrl.GetUser());
00406 fi->AddUrl(ur.GetUrl());
00407 }
00408
00409 path.Form("%s/%s?node=%s", pFile->GetDir(kTRUE), pFile->GetFileName(), pFile->GetLocalHost());
00410 fi->AddUrl(path);
00411 fi->Print();
00412
00413 dataset->Add(fi);
00414 }
00415 }
00416 }
00417
00418
00419 return 0;
00420 }
00421
00422
00423 void TProofOutputFile::Print(Option_t *) const
00424 {
00425
00426
00427 Info("Print","-------------- %s : start (%s) ------------", GetName(), fLocalHost.Data());
00428 Info("Print"," dir: %s", fDir.Data());
00429 Info("Print"," raw dir: %s", fRawDir.Data());
00430 Info("Print"," file name: %s%s", fFileName.Data(), fOptionsAnchor.Data());
00431 if (IsMerge()) {
00432 Info("Print"," run type: create a merged file");
00433 Info("Print"," merging option: %s",
00434 (fTypeOpt == kLocal) ? "local copy" : "keep remote");
00435 } else {
00436 TString opt;
00437 if ((fTypeOpt & kRegister)) opt += "R";
00438 if ((fTypeOpt & kOverwrite)) opt += "O";
00439 if ((fTypeOpt & kVerify)) opt += "V";
00440 Info("Print"," run type: create dataset (name: '%s', opt: '%s')",
00441 GetTitle(), opt.Data());
00442 }
00443 Info("Print"," output file name: %s", fOutputFileName.Data());
00444 Info("Print"," ordinal: %s", fWorkerOrdinal.Data());
00445 Info("Print","-------------- %s : done -------------", GetName());
00446
00447 return;
00448 }
00449
00450
00451 void TProofOutputFile::NotifyError(const char *msg)
00452 {
00453
00454
00455 if (msg) {
00456 if (gProofServ)
00457 gProofServ->SendAsynMessage(msg);
00458 else
00459 Printf("%s", msg);
00460 } else {
00461 Info("NotifyError","called with empty message");
00462 }
00463
00464 return;
00465 }
00466
00467
00468 void TProofOutputFile::AddFile(TFileMerger *merger, const char *path)
00469 {
00470
00471
00472 if (merger && path) {
00473 if (!merger->AddFile(path))
00474 NotifyError(Form("TProofOutputFile::AddFile:"
00475 " error from TFileMerger::AddFile(%s)", path));
00476 }
00477 }
00478
00479
00480 void TProofOutputFile::Unlink(const char *path)
00481 {
00482
00483
00484 if (path) {
00485 if (!gSystem->AccessPathName(path)) {
00486 if (gSystem->Unlink(path) != 0)
00487 NotifyError(Form("TProofOutputFile::Unlink:"
00488 " error from TSystem::Unlink(%s)", path));
00489 }
00490 }
00491 }
00492
00493
00494 TFileCollection *TProofOutputFile::GetFileCollection()
00495 {
00496
00497
00498 if (!fDataSet)
00499 fDataSet = new TFileCollection(GetTitle());
00500 return fDataSet;
00501 }
00502
00503
00504 TFileMerger *TProofOutputFile::GetFileMerger(Bool_t local)
00505 {
00506
00507
00508 if (!fMerger)
00509 fMerger = new TFileMerger(local);
00510 return fMerger;
00511 }