TProofOutputFile.cxx

Go to the documentation of this file.
00001 // @(#)root/proof:$Id: TProofOutputFile.cxx 38381 2011-03-11 12:51:58Z ganis $
00002 // Author: Long Tran-Thanh   14/09/07
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TProofOutputFile                                                     //
00015 //                                                                      //
00016 // Small class to steer the merging of files produced on the workers    //
00017 //                                                                      //
00018 //////////////////////////////////////////////////////////////////////////
00019 
00020 #include "TProofOutputFile.h"
00021 #include <TEnv.h>
00022 #include <TFileCollection.h>
00023 #include <TFileInfo.h>
00024 #include <TFileMerger.h>
00025 #include <TFile.h>
00026 #include <TList.h>
00027 #include <TObjArray.h>
00028 #include <TObject.h>
00029 #include <TObjString.h>
00030 #include <TProofServ.h>
00031 #include <TSystem.h>
00032 #include <TUUID.h>
00033 
00034 ClassImp(TProofOutputFile)
00035 
00036 //________________________________________________________________________________
00037 TProofOutputFile::TProofOutputFile(const char *path,
00038                                    ERunType type, UInt_t opt, const char *dsname)
00039                  : TNamed(path, ""), fRunType(type), fTypeOpt(opt)
00040 {
00041    // Main constructor
00042 
00043    fIsLocal = kFALSE;
00044    fMerged = kFALSE;
00045    fMerger = 0;
00046    fDataSet = 0;
00047 
00048    Init(path, dsname);
00049 }
00050 
00051 //________________________________________________________________________________
00052 TProofOutputFile::TProofOutputFile(const char *path,
00053                                    const char *option, const char *dsname)
00054                  : TNamed(path, "")
00055 {
00056    // Constructor with the old signature, kept for convenience and backard compatibility.
00057    // Options:
00058    //             'M'      merge: finally merge the created files
00059    //             'L'      local: copy locally the files before merging (implies 'M')
00060    //             'D'      dataset: create a TFileCollection
00061    //             'R'      register: dataset run with dataset registration
00062    //             'O'      overwrite: force dataset replacement during registration
00063    //             'V'      verify: verify the registered dataset
00064    // Special 'option' values for backward compatibility:
00065    //              ""      equivalent to "M"
00066    //         "LOCAL"      equivalent to "ML" or "L"
00067 
00068    fIsLocal = kFALSE;
00069    fMerged = kFALSE;
00070    fMerger = 0;
00071    fDataSet = 0;
00072 
00073    // Fill the run type and option type
00074    fRunType = kMerge;
00075    fTypeOpt = kRemote;
00076    if (option && strlen(option) > 0) {
00077       TString opt(option);
00078       if (opt.Contains("L") || (opt == "LOCAL")) fTypeOpt = kLocal;
00079       if (!opt.Contains("M") && opt.Contains("D")) {
00080          // Dataset creation mode
00081          fRunType = kDataset;
00082          fTypeOpt = kCreate;
00083          if (opt.Contains("R")) fTypeOpt = (ETypeOpt) (fTypeOpt | kRegister);
00084          if (opt.Contains("O")) fTypeOpt = (ETypeOpt) (fTypeOpt | kOverwrite);
00085          if (opt.Contains("V")) fTypeOpt = (ETypeOpt) (fTypeOpt | kVerify);
00086       }
00087    }
00088 
00089    Init(path, dsname);
00090 }
00091 
00092 //________________________________________________________________________________
00093 void TProofOutputFile::Init(const char *path, const char *dsname)
00094 {
00095    // Initializer. Called by all constructors
00096 
00097    fLocalHost = TUrl(gSystem->HostName()).GetHostFQDN();
00098    Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
00099    if (port > -1) {
00100       fLocalHost += ":";
00101       fLocalHost += port;
00102    }
00103 
00104    TUrl u(path, kTRUE);
00105    // File name
00106    fFileName = u.GetFile();
00107    // The name is used to identify this entity
00108    SetName(gSystem->BaseName(fFileName.Data()));
00109    if (dsname && strlen(dsname) > 0) {
00110       // This is the dataset name in case such option is chosen
00111       SetTitle(dsname);
00112    } else {
00113       // Default dataset name
00114       SetTitle(GetName());
00115    }
00116    // Options and anchor, if any
00117    if (u.GetOptions() && strlen(u.GetOptions()) > 0)
00118       fOptionsAnchor += TString::Format("?%s", u.GetOptions());
00119    if (u.GetAnchor() && strlen(u.GetAnchor()) > 0)
00120       fOptionsAnchor += TString::Format("#%s", u.GetAnchor());
00121    // Path
00122    fIsLocal = kFALSE;
00123    fDir = u.GetUrl();
00124    Int_t pos = fDir.Index(fFileName);
00125    if (pos != kNPOS) fDir.Remove(pos);
00126    fRawDir = fDir;
00127 
00128    if (fDir == "file:") {
00129       fIsLocal = kTRUE;
00130       // For local files, the user is allowed to create files under the assigned directories
00131       // If this is not the case, the file is rooted automatically to the assigned dir which
00132       // is the datadir for dataset creation runs, and the working dir for merging runs
00133       TString dirPath = gSystem->DirName(fFileName);
00134       TString dirData = (!IsMerge() && gProofServ) ? gProofServ->GetDataDir()
00135                                                    : gSystem->WorkingDirectory();
00136       if ((dirPath[0] == '/') && !dirPath.BeginsWith(dirData)) {
00137          Warning("Init", "not allowed to create files under '%s' - chrooting to '%s'",
00138                          dirPath.Data(), dirData.Data());
00139          dirPath.Insert(0, dirData);
00140       } else if (dirPath.BeginsWith("..")) {
00141          dirPath.Remove(0, 2);
00142          if (dirPath[0] != '/') dirPath.Insert(0, "/");
00143          dirPath.Insert(0, dirData);
00144       } else if (dirPath[0] == '.' || dirPath[0] == '~') {
00145          dirPath.Remove(0, 1);
00146          if (dirPath[0] != '/') dirPath.Insert(0, "/");
00147          dirPath.Insert(0, dirData);
00148       } else if (dirPath.IsNull()) {
00149          dirPath = dirData;
00150       }
00151       // Make sure that session-tag, ordinal and query sequential number are present otherwise
00152       // we may override outputs from other workers
00153       if (!IsMerge() && gProofServ) {
00154          if (!dirPath.Contains(gProofServ->GetOrdinal())) {
00155             if (!dirPath.EndsWith("/")) dirPath += "/";
00156             dirPath += gProofServ->GetOrdinal();
00157          }
00158          if (!dirPath.Contains(gProofServ->GetSessionTag())) {
00159             if (!dirPath.EndsWith("/")) dirPath += "/";
00160             dirPath += gProofServ->GetSessionTag();
00161          }
00162          if (!dirPath.Contains("<qnum>")) {
00163             if (!dirPath.EndsWith("/")) dirPath += "/";
00164             dirPath += "<qnum>";
00165          }
00166       }
00167       // Resolve the relevant placeholders
00168       TProofServ::ResolveKeywords(dirPath, 0);
00169       // Save the raw directory
00170       fRawDir = dirPath;
00171       // Make sure the the path exists
00172       // Locate the portion of path already existing and get its mode: we make sure that this
00173       // mode applies to all new subpaths created
00174       TString existsPath(dirPath);
00175       TList subPaths;
00176       while (existsPath != "/" && existsPath != "." && gSystem->AccessPathName(existsPath)) {
00177          subPaths.AddFirst(new TObjString(gSystem->BaseName(existsPath)));
00178          existsPath = gSystem->DirName(existsPath);
00179       }
00180       subPaths.SetOwner(kTRUE);
00181       FileStat_t st;
00182       if (gSystem->GetPathInfo(existsPath, st) == 0) {
00183          TString xpath = existsPath;
00184          TIter nxp(&subPaths);
00185          TObjString *os = 0;
00186          while ((os = (TObjString *) nxp())) {
00187             xpath += TString::Format("/%s", os->GetName());
00188             if (gSystem->mkdir(xpath, kTRUE) == 0) {
00189                if (gSystem->Chmod(xpath, (UInt_t) st.fMode) != 0)
00190                   Warning("Init", "problems setting mode on '%s'", xpath.Data());
00191             } else {
00192                Error("Init", "problems creating path '%s'", xpath.Data());
00193             }
00194          }
00195       } else {
00196          Warning("Init", "could not get info for path '%s': will only try to create"
00197                          " the full path w/o trying to set the mode", existsPath.Data());
00198          if (gSystem->mkdir(existsPath, kTRUE) != 0)
00199             Error("Init", "problems creating path '%s'", existsPath.Data());
00200       }
00201       // Remove prefix, if any and if included
00202       TString pfx  = gEnv->GetValue("Path.Localroot","");
00203       if (!pfx.IsNull() && dirPath.BeginsWith(pfx)) dirPath.Remove(0, pfx.Length());
00204       // Check if a local data server has been specified
00205       if (gSystem->Getenv("LOCALDATASERVER")) {
00206          fDir = gSystem->Getenv("LOCALDATASERVER");
00207          if (!fDir.EndsWith("/")) fDir += "/";
00208       }
00209       fDir += dirPath;
00210    } else {
00211       // Allow for place holders in fFileName (e.g. root://a.ser.ver//data/dir/<group>/<user>/file)
00212       TProofServ::ResolveKeywords(fFileName, 0);
00213    }
00214    // Notify
00215    Info("Init", "dir: %s (raw: %s)", fDir.Data(), fRawDir.Data());
00216 
00217    // Default output file name
00218    fOutputFileName = gEnv->GetValue("Proof.OutputFile", "<file>");
00219    // Add default file name
00220    TString fileName = path;
00221    if (!fileName.EndsWith(".root")) fileName += ".root";
00222    // Make sure that the file name was inserted (may not happen if the placeholder <file> is missing)
00223    if (!fOutputFileName.IsNull() && !fOutputFileName.Contains("<file>")) {
00224       if (!fOutputFileName.EndsWith("/")) fOutputFileName += "/";
00225          fOutputFileName += fileName;
00226    }
00227    // Resolve placeholders
00228    fileName.ReplaceAll("<ord>",""); // No ordinal in the final merged file
00229    TProofServ::ResolveKeywords(fOutputFileName, fileName);
00230    Info("Init", "output file url: %s", fOutputFileName.Data());
00231    // Fill ordinal
00232    fWorkerOrdinal = "<ord>";
00233    TProofServ::ResolveKeywords(fWorkerOrdinal, 0);
00234 }
00235 
00236 //________________________________________________________________________________
00237 TProofOutputFile::~TProofOutputFile()
00238 {
00239    // Main destructor
00240 
00241    if (fDataSet) delete fDataSet;
00242    if (fMerger) delete fMerger;
00243 }
00244 
00245 //______________________________________________________________________________
00246 void TProofOutputFile::SetFileName(const char* name)
00247 {
00248    // Set the file name
00249 
00250    fFileName = name;
00251 }
00252 
00253 //______________________________________________________________________________
00254 void TProofOutputFile::SetOutputFileName(const char *name)
00255 {
00256    // Set the name of the output file; in the form of an Url.
00257 
00258    if (name && strlen(name) > 0) {
00259       fOutputFileName = name;
00260       TProofServ::ResolveKeywords(fOutputFileName);
00261       Info("SetOutputFileName", "output file url: %s", fOutputFileName.Data());
00262    } else {
00263       fOutputFileName = "";
00264    }
00265 }
00266 
00267 //______________________________________________________________________________
00268 TFile* TProofOutputFile::OpenFile(const char* opt)
00269 {
00270    // Open the file using the unique temporary name
00271 
00272    if (fFileName.IsNull()) return 0;
00273 
00274    // Create the path
00275    TString fileLoc;
00276    fileLoc.Form("%s/%s%s", fRawDir.Data(), fFileName.Data(), fOptionsAnchor.Data());
00277 
00278    // Open the file
00279    TFile *retFile = TFile::Open(fileLoc, opt);
00280 
00281    return retFile;
00282 }
00283 
00284 //______________________________________________________________________________
00285 Int_t TProofOutputFile::AdoptFile(TFile *f)
00286 {
00287    // Adopt a file already open.
00288    // Return 0 if OK, -1 in case of failure
00289 
00290    if (!f || f->IsZombie())
00291       return -1;
00292 
00293    // Set the name and dir
00294    TUrl u(*(f->GetEndpointUrl()));
00295    fIsLocal = kFALSE;
00296    if (!strcmp(u.GetProtocol(), "file")) {
00297       fIsLocal = kTRUE;
00298       fDir = u.GetFile();
00299    } else {
00300       fDir = u.GetUrl();
00301    }
00302    fFileName = gSystem->BaseName(fDir.Data());
00303    fDir.ReplaceAll(fFileName, "");
00304    fRawDir = fDir;
00305 
00306    // Remove prefix, if any
00307    TString pfx  = gEnv->GetValue("Path.Localroot","");
00308    if (!pfx.IsNull()) fDir.ReplaceAll(pfx, "");
00309    // Include the local data server info, if any
00310    if (gSystem->Getenv("LOCALDATASERVER")) {
00311       TString localDS(gSystem->Getenv("LOCALDATASERVER"));
00312       if (!localDS.EndsWith("/")) localDS += "/";
00313       fDir.Insert(0, localDS);
00314    }
00315 
00316    return 0;
00317 }
00318 
00319 //______________________________________________________________________________
00320 Long64_t TProofOutputFile::Merge(TCollection* list)
00321 {
00322    // Merge objects from the list into this object
00323 
00324    // Needs somethign to merge
00325    if(!list || list->IsEmpty()) return 0;
00326 
00327    if (IsMerge()) {
00328       // Build-up the merger
00329       TString fileLoc;
00330       TString outputFileLoc = (fOutputFileName.IsNull()) ? fFileName : fOutputFileName;
00331       // Get the file merger instance
00332       Bool_t localMerge = (fRunType == kMerge && fTypeOpt == kLocal) ? kTRUE : kFALSE;
00333       TFileMerger *merger = GetFileMerger(localMerge);
00334       if (!merger) {
00335          Error("Merge", "could not instantiate the file merger");
00336          return -1;
00337       }
00338 
00339       if (!fMerged) {
00340          merger->OutputFile(outputFileLoc);
00341          fileLoc.Form("%s/%s", fDir.Data(), GetFileName());
00342          AddFile(merger, fileLoc);
00343          fMerged = kTRUE;
00344       }
00345 
00346       TIter next(list);
00347       TObject *o = 0;
00348       while((o = next())) {
00349          TProofOutputFile *pFile = dynamic_cast<TProofOutputFile *>(o);
00350          if (pFile) {
00351             fileLoc = Form("%s/%s", pFile->GetDir(), pFile->GetFileName());
00352             AddFile(merger, fileLoc);
00353          }
00354       }
00355    } else {
00356       // Get the reference MSS url, if any
00357       TUrl mssUrl(gEnv->GetValue("ProofServ.PoolUrl",""));
00358       // Build-up the TFileCollection
00359       TFileCollection *dataset = GetFileCollection();
00360       if (!dataset) {
00361          Error("Merge", "could not instantiate the file collection");
00362          return -1;
00363       }
00364       TString path;
00365       TFileInfo *fi = 0;
00366       // If new, add ourseelves
00367       dataset->Update();
00368       if (dataset->GetNFiles() == 0) {
00369          // Save the export and raw urls
00370          path.Form("%s/%s%s", GetDir(), GetFileName(), GetOptionsAnchor());
00371          fi = new TFileInfo(path);
00372          // Add also an URL with the redirector path, if any
00373          if (mssUrl.IsValid()) {
00374             TUrl ur(fi->GetFirstUrl()->GetUrl());
00375             ur.SetProtocol(mssUrl.GetProtocol());
00376             ur.SetHost(mssUrl.GetHost());
00377             ur.SetPort(mssUrl.GetPort());
00378             if (mssUrl.GetUser() && strlen(mssUrl.GetUser()) > 0)
00379                ur.SetUser(mssUrl.GetUser());
00380             fi->AddUrl(ur.GetUrl());
00381          }
00382          // Add special local URL to keep track of the file
00383          path.Form("%s/%s?node=%s", GetDir(kTRUE), GetFileName(), GetLocalHost());
00384          fi->AddUrl(path);
00385          fi->Print();
00386          // Now add to the dataset
00387          dataset->Add(fi);
00388       }
00389 
00390       TIter next(list);
00391       TObject *o = 0;
00392       while((o = next())) {
00393          TProofOutputFile *pFile = dynamic_cast<TProofOutputFile *>(o);
00394          if (pFile) {
00395             // Save the export and raw urls
00396             path.Form("%s/%s%s", pFile->GetDir(), pFile->GetFileName(), pFile->GetOptionsAnchor());
00397             fi = new TFileInfo(path);
00398             // Add also an URL with the redirector path, if any
00399             if (mssUrl.IsValid()) {
00400                TUrl ur(fi->GetFirstUrl()->GetUrl());
00401                ur.SetProtocol(mssUrl.GetProtocol());
00402                ur.SetHost(mssUrl.GetHost());
00403                ur.SetPort(mssUrl.GetPort());
00404                if (mssUrl.GetUser() && strlen(mssUrl.GetUser()) > 0)
00405                   ur.SetUser(mssUrl.GetUser());
00406                fi->AddUrl(ur.GetUrl());
00407             }
00408             // Add special local URL to keep track of the file
00409             path.Form("%s/%s?node=%s", pFile->GetDir(kTRUE), pFile->GetFileName(), pFile->GetLocalHost());
00410             fi->AddUrl(path);
00411             fi->Print();
00412             // Now add to the dataset
00413             dataset->Add(fi);
00414          }
00415       }
00416    }
00417 
00418    // Done
00419    return 0;
00420 }
00421 
00422 //______________________________________________________________________________
00423 void TProofOutputFile::Print(Option_t *) const
00424 {
00425    // Dump the class content
00426 
00427    Info("Print","-------------- %s : start (%s) ------------", GetName(), fLocalHost.Data());
00428    Info("Print"," dir:              %s", fDir.Data());
00429    Info("Print"," raw dir:          %s", fRawDir.Data());
00430    Info("Print"," file name:        %s%s", fFileName.Data(), fOptionsAnchor.Data());
00431    if (IsMerge()) {
00432       Info("Print"," run type:         create a merged file");
00433       Info("Print"," merging option:   %s",
00434                        (fTypeOpt == kLocal) ? "local copy" : "keep remote");
00435    } else {
00436       TString opt;
00437       if ((fTypeOpt & kRegister)) opt += "R";
00438       if ((fTypeOpt & kOverwrite)) opt += "O";
00439       if ((fTypeOpt & kVerify)) opt += "V";
00440       Info("Print"," run type:         create dataset (name: '%s', opt: '%s')",
00441                                          GetTitle(), opt.Data());
00442    }
00443    Info("Print"," output file name: %s", fOutputFileName.Data());
00444    Info("Print"," ordinal:          %s", fWorkerOrdinal.Data());
00445    Info("Print","-------------- %s : done -------------", GetName());
00446 
00447    return;
00448 }
00449 
00450 //______________________________________________________________________________
00451 void TProofOutputFile::NotifyError(const char *msg)
00452 {
00453    // Notify error message
00454 
00455    if (msg) {
00456       if (gProofServ)
00457          gProofServ->SendAsynMessage(msg);
00458       else
00459          Printf("%s", msg);
00460    } else {
00461       Info("NotifyError","called with empty message");
00462    }
00463 
00464    return;
00465 }
00466 
00467 //______________________________________________________________________________
00468 void TProofOutputFile::AddFile(TFileMerger *merger, const char *path)
00469 {
00470    // Add file to merger, checking the result
00471 
00472    if (merger && path) {
00473       if (!merger->AddFile(path))
00474          NotifyError(Form("TProofOutputFile::AddFile:"
00475                           " error from TFileMerger::AddFile(%s)", path));
00476    }
00477 }
00478 
00479 //______________________________________________________________________________
00480 void TProofOutputFile::Unlink(const char *path)
00481 {
00482    // Unlink path
00483 
00484    if (path) {
00485       if (!gSystem->AccessPathName(path)) {
00486          if (gSystem->Unlink(path) != 0)
00487             NotifyError(Form("TProofOutputFile::Unlink:"
00488                              " error from TSystem::Unlink(%s)", path));
00489       }
00490    }
00491 }
00492 
00493 //______________________________________________________________________________
00494 TFileCollection *TProofOutputFile::GetFileCollection()
00495 {
00496    // Get instance of the file collection to be used in 'dataset' mode
00497 
00498    if (!fDataSet)
00499       fDataSet = new TFileCollection(GetTitle());
00500    return fDataSet;
00501 }
00502 
00503 //______________________________________________________________________________
00504 TFileMerger *TProofOutputFile::GetFileMerger(Bool_t local)
00505 {
00506    // Get instance of the file merger to be used in 'merge' mode
00507 
00508    if (!fMerger)
00509       fMerger = new TFileMerger(local);
00510    return fMerger;
00511 }

Generated on Tue Jul 5 14:52:19 2011 for ROOT_528-00b_version by  doxygen 1.5.1