TDataSetManager.cxx

Go to the documentation of this file.
00001 // @(#)root/base:$Id: TDataSetManager.cxx 37898 2011-01-27 15:25:40Z ganis $
00002 // Author: Jan Fiete Grosse-Oetringhaus, 04.06.07
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TDataSetManager                                                 //
00015 //                                                                      //
00016 // This class contains functions to handle datasets in PROOF            //
00017 // It is the layer between TProofServ and the file system that stores   //
00018 // the datasets.                                                        //
00019 //                                                                      //
00020 //////////////////////////////////////////////////////////////////////////
00021 
00022 
00023 #include "TDataSetManager.h"
00024 
00025 #include "Riostream.h"
00026 
00027 #include "TEnv.h"
00028 #include "TError.h"
00029 #include "TFile.h"
00030 #include "TFileCollection.h"
00031 #include "TFileInfo.h"
00032 #include "TFileStager.h"
00033 #include "TMD5.h"
00034 #include "THashList.h"
00035 #include "TKey.h"
00036 #include "TObjArray.h"
00037 #include "TObjString.h"
00038 #include "TParameter.h"
00039 #include "TPRegexp.h"
00040 #include "TRegexp.h"
00041 #include "TSystem.h"
00042 #include "TTree.h"
00043 #include "TUrl.h"
00044 #include "TVirtualMonitoring.h"
00045 
00046 // One Gigabyte
00047 #define DSM_ONE_GB (1073741824)
00048 
00049 // Name for common datasets
00050 TString TDataSetManager::fgCommonDataSetTag = "COMMON";
00051 TList   *TDataSetManager::fgDataSetSrvMaps = 0;
00052 
00053 ClassImp(TDataSetManager)
00054 
00055 //_____________________________________________________________________________
00056 TDataSetManager::TDataSetManager(const char *group, const char *user,
00057                                            const char *options)
00058                      : fGroup(group),
00059                        fUser(user), fCommonUser(), fCommonGroup(),
00060                        fGroupQuota(), fGroupUsed(),
00061                        fUserUsed(), fNTouchedFiles(0), fNOpenedFiles(0),
00062                        fNDisappearedFiles(0), fMTimeGroupConfig(-1)
00063 {
00064    //
00065    // Main constructor
00066 
00067    // Fill default group and user if none is given
00068    if (fGroup.IsNull())
00069       fGroup = "default";
00070    if (fUser.IsNull()) {
00071       fUser = "--nouser--";
00072       // Get user logon name
00073       UserGroup_t *pw = gSystem->GetUserInfo();
00074       if (pw) {
00075          fUser = pw->fUser;
00076          delete pw;
00077       }
00078    }
00079 
00080    fGroupQuota.SetOwner();
00081    fGroupUsed.SetOwner();
00082    fUserUsed.SetOwner();
00083 
00084    fCommonUser = "COMMON";
00085    fCommonGroup = "COMMON";
00086 
00087    fNTouchedFiles = -1;
00088    fNOpenedFiles = -1;
00089    fNDisappearedFiles = -1;
00090    fMTimeGroupConfig = -1;
00091 
00092    fAvgFileSize = 50000000;  // Default 50 MB per file
00093 
00094    // Parse options
00095    ParseInitOpts(options);
00096 
00097    if (!fUser.IsNull() && !fGroup.IsNull()) {
00098 
00099       // If not in sandbox, construct the base URI using session defaults
00100       // (group, user) (syntax: /group/user/dsname[#[subdir/]objname])
00101       if (!TestBit(TDataSetManager::kIsSandbox))
00102          fBase.SetUri(TString(Form("/%s/%s/", fGroup.Data(), fUser.Data())));
00103 
00104    }
00105 
00106    // List of dataset server mapping instructions
00107    TString srvmaps(gEnv->GetValue("DataSet.SrvMaps",""));
00108    TString srvmapsenv(gSystem->Getenv("DATASETSRVMAPS"));
00109    if (!(srvmapsenv.IsNull())) {
00110       if (srvmapsenv.BeginsWith("+")) {
00111          if (!(srvmaps.IsNull())) srvmaps += ",";
00112          srvmaps += srvmapsenv(1,srvmapsenv.Length());
00113       } else {
00114          srvmaps = srvmapsenv;
00115       }
00116    }
00117    if (!(srvmaps.IsNull()) && !(fgDataSetSrvMaps = ParseDataSetSrvMaps(srvmaps)))
00118       Warning("TDataSetManager", "problems parsing DataSet.SrvMaps input info (%s)"
00119                                  " - ignoring", srvmaps.Data());
00120 
00121    // Read config file
00122    ReadGroupConfig(gEnv->GetValue("Proof.GroupFile", ""));
00123 }
00124 
00125 //______________________________________________________________________________
00126 TDataSetManager::~TDataSetManager()
00127 {
00128    // Destructor
00129 
00130    // Clear used space
00131    fGroupQuota.DeleteAll();
00132    fGroupUsed.DeleteAll();
00133    fUserUsed.DeleteAll();
00134 }
00135 
00136 //______________________________________________________________________________
00137 void TDataSetManager::ParseInitOpts(const char *opts)
00138 {
00139    // Parse the opts string and set the init bits accordingly
00140    // Available options:
00141    //    Cq:               set kCheckQuota
00142    //    Ar:               set kAllowRegister
00143    //    Av:               set kAllowVerify
00144    //    Ti:               set kTrustInfo
00145    //    Sb:               set kIsSandbox
00146    //    Ca:               set kUseCache or kDoNotUseCache
00147    // The opts string may also contain additional unrelated info: in such a case
00148    // the field delimited by the prefix "opt:" is analyzed, e.g. if opts is
00149    // "/tmp/dataset  opt:Cq:-Ar: root://lxb6046.cern.ch" only the substring
00150    // "Cq:-Ar:" will be parsed .
00151 
00152    // Default option bits
00153    ResetBit(TDataSetManager::kCheckQuota);
00154    SetBit(TDataSetManager::kAllowRegister);
00155    SetBit(TDataSetManager::kAllowVerify);
00156    SetBit(TDataSetManager::kTrustInfo);
00157    ResetBit(TDataSetManager::kIsSandbox);
00158    ResetBit(TDataSetManager::kUseCache);
00159    ResetBit(TDataSetManager::kDoNotUseCache);
00160 
00161    if (opts && strlen(opts) > 0) {
00162       TString opt(opts);
00163       // If it contains the prefix "opt:", isolate the related field
00164       Int_t ip = opt.Index("opt:");
00165       if (ip != kNPOS) opt.Remove(0, ip + 4);
00166       ip = opt.Index(" ");
00167       if (ip != kNPOS) opt.Remove(ip);
00168       // Check the content, now
00169       if (opt.Contains("Cq:") && !opt.Contains("-Cq:"))
00170          SetBit(TDataSetManager::kCheckQuota);
00171       if (opt.Contains("-Ar:"))
00172          ResetBit(TDataSetManager::kAllowRegister);
00173       if (opt.Contains("-Av:"))
00174          ResetBit(TDataSetManager::kAllowVerify);
00175       if (opt.Contains("-Ti:"))
00176          ResetBit(TDataSetManager::kTrustInfo);
00177       if (opt.Contains("Sb:") && !opt.Contains("-Sb:"))
00178          SetBit(TDataSetManager::kIsSandbox);
00179       if (opt.Contains("Ca:"))
00180          SetBit(TDataSetManager::kUseCache);
00181       if (opt.Contains("-Ca:"))
00182          SetBit(TDataSetManager::kDoNotUseCache);
00183    }
00184 
00185    // Check dependencies
00186    if (TestBit(TDataSetManager::kAllowVerify)) {
00187       // Dataset verification or requires registration permition
00188       SetBit(TDataSetManager::kAllowRegister);
00189    }
00190    // UseCache has priority
00191    if (TestBit(TDataSetManager::kUseCache) && TestBit(TDataSetManager::kDoNotUseCache))
00192       ResetBit(TDataSetManager::kDoNotUseCache);
00193 }
00194 
00195 //______________________________________________________________________________
00196 Bool_t TDataSetManager::ReadGroupConfig(const char *cf)
00197 {
00198    // Read group config file 'cf'.
00199    // If cf == 0 re-read, if changed, the file pointed by fGroupConfigFile .
00200    //
00201    // expects the following directives:
00202    // Group definition:
00203    //   group <groupname> <user>+
00204    // disk quota
00205    //   property <groupname> diskquota <quota in GB>
00206    // average filesize (to be used when the file size is not available)
00207    //   averagefilesize <average size>{G,g,M,m,K,k}
00208 
00209    // Validate input
00210    FileStat_t st;
00211    if (!cf || (strlen(cf) <= 0) || !strcmp(cf, fGroupConfigFile.Data())) {
00212       // If this is the first time we cannot do anything
00213       if (fGroupConfigFile.IsNull()) {
00214          if (gDebug > 0)
00215             Info("ReadGroupConfig", "path to config file undefined - nothing to do");
00216          return kFALSE;
00217       }
00218       // Check if fGroupConfigFile has changed
00219       if (gSystem->GetPathInfo(fGroupConfigFile, st)) {
00220          Error("ReadGroupConfig", "could not stat %s", fGroupConfigFile.Data());
00221          return kFALSE;
00222       }
00223       if (st.fMtime <= fMTimeGroupConfig) {
00224          if (gDebug > 0)
00225             Info("ReadGroupConfig","file has not changed - do nothing");
00226          return kTRUE;
00227       }
00228    }
00229 
00230    // Either new file or the file has changed
00231    if (cf && (strlen(cf) > 0)) {
00232       // The file must exist and be readable
00233       if (gSystem->GetPathInfo(cf, st)) {
00234          Error("ReadGroupConfig", "could not stat %s", cf);
00235          return kFALSE;
00236       }
00237       if (gSystem->AccessPathName(cf, kReadPermission)) {
00238          Error("ReadGroupConfig", "cannot read %s", cf);
00239          return kFALSE;
00240       }
00241       // Ok
00242       fGroupConfigFile = cf;
00243       fMTimeGroupConfig = st.fMtime;
00244    }
00245 
00246    if (gDebug > 0)
00247       Info("ReadGroupConfig","reading group config from %s", cf);
00248 
00249    // Open the config file
00250    ifstream in;
00251    in.open(cf);
00252    if (!in.is_open()) {
00253       Error("ReadGroupConfig", "could not open config file %s", cf);
00254       return kFALSE;
00255    }
00256 
00257    // Container for the global common user
00258    TString tmpCommonUser;
00259 
00260    // Go through
00261    TString line;
00262    while (in.good()) {
00263       // Read new line
00264       line.ReadLine(in);
00265       // Explicitely skip comment lines
00266       if (line[0] == '#') continue;
00267       // Parse it
00268       Ssiz_t from = 0;
00269       TString key;
00270       if (!line.Tokenize(key, from, " ")) // No token
00271          continue;
00272       // Parsing depends on the key
00273       if (key == "property") {
00274          // Read group
00275          TString grp;
00276          if (!line.Tokenize(grp, from, " ")) {// No token
00277             if (gDebug > 0)
00278                Info("ReadGroupConfig","incomplete line: '%s'", line.Data());
00279             continue;
00280          }
00281          // Read type of property
00282          TString type;
00283          if (!line.Tokenize(type, from, " ")) // No token
00284             continue;
00285          if (type == "diskquota") {
00286             // Read diskquota
00287             TString sdq;
00288             if (!line.Tokenize(sdq, from, " ")) // No token
00289                continue;
00290             // Enforce GigaBytes as default
00291             if (sdq.IsDigit()) sdq += "G";
00292             Long64_t quota = ToBytes(sdq);
00293             if (quota > -1) {
00294                fGroupQuota.Add(new TObjString(grp),
00295                                new TParameter<Long64_t> ("group quota", quota));
00296             } else {
00297                Warning("ReadGroupConfig",
00298                        "problems parsing string: wrong or unsupported suffix? %s",
00299                         sdq.Data());
00300             }
00301          } else if (type == "commonuser") {
00302             // Read common user for this group
00303             TString comusr;
00304             if (!line.Tokenize(comusr, from, " ")) // No token
00305                continue;
00306 
00307          }
00308 
00309       } else if (key == "dataset") {
00310          // Read type
00311          TString type;
00312          if (!line.Tokenize(type, from, " ")) {// No token
00313             if (gDebug > 0)
00314                Info("ReadGroupConfig","incomplete line: '%s'", line.Data());
00315             continue;
00316          }
00317          if (type == "commonuser") {
00318             // Read global common user
00319             TString comusr;
00320             if (!line.Tokenize(comusr, from, " ")) // No token
00321                continue;
00322             fCommonUser = comusr;
00323          } else if (type == "commongroup") {
00324             // Read global common group
00325             TString comgrp;
00326             if (!line.Tokenize(comgrp, from, " ")) // No token
00327                continue;
00328             fCommonGroup = comgrp;
00329          } else if (type == "diskquota") {
00330             // Quota check switch
00331             TString on;
00332             if (!line.Tokenize(on, from, " ")) // No token
00333                continue;
00334             if (on == "on") {
00335                SetBit(TDataSetManager::kCheckQuota);
00336             } else if (on == "off") {
00337                ResetBit(TDataSetManager::kCheckQuota);
00338             }
00339          }
00340 
00341       } else if (key == "averagefilesize") {
00342 
00343          // Read average size
00344          TString avgsize;
00345          if (!line.Tokenize(avgsize, from, " ")) {// No token
00346             if (gDebug > 0)
00347                Info("ReadGroupConfig","incomplete line: '%s'", line.Data());
00348             continue;
00349          }
00350          Long64_t avgsz = ToBytes(avgsize);
00351          if (avgsz > -1) {
00352             fAvgFileSize = avgsz;
00353          } else {
00354             Warning("ReadGroupConfig",
00355                     "problems parsing string: wrong or unsupported suffix? %s",
00356                     avgsize.Data());
00357          }
00358       } else if (key == "include") {
00359 
00360          // Read file to include
00361          TString subfn;
00362          if (!line.Tokenize(subfn, from, " ")) {// No token
00363             if (gDebug > 0)
00364                Info("ReadGroupConfig","incomplete line: '%s'", line.Data());
00365             continue;
00366          }
00367          // The file must be readable
00368          if (gSystem->AccessPathName(subfn, kReadPermission)) {
00369             Error("ReadGroupConfig", "request to parse file '%s' which is not readable",
00370                                      subfn.Data());
00371             continue;
00372          }
00373          if (!ReadGroupConfig(subfn))
00374             Error("ReadGroupConfig", "problems parsing include file '%s'", subfn.Data());
00375       }
00376    }
00377    in.close();
00378 
00379    return kTRUE;
00380 }
00381 
00382 //______________________________________________________________________________
00383 Long64_t TDataSetManager::ToBytes(const char *size)
00384 {
00385    // Static utility function to gt the number of bytes from a string
00386    // representation in the form "<digit><sfx>" with <sfx> = {"", "k", "M", "G",
00387    // "T", "P"} (case insensitive).
00388    // Returns -1 if the format is wrong.
00389 
00390    Long64_t lsize = -1;
00391 
00392    // Check if valid
00393    if (!size || strlen(size) <= 0) return lsize;
00394 
00395    TString s(size);
00396    // Determine factor
00397    Long64_t fact = 1;
00398    if (!s.IsDigit()) {
00399       const char *unit[5] = { "k", "M", "G", "T", "P"};
00400       fact = 1024;
00401       Int_t jj = 0;
00402       while (jj <= 4) {
00403          if (s.EndsWith(unit[jj], TString::kIgnoreCase)) {
00404             s.Remove(s.Length()-1);
00405             break;
00406          }
00407          fact *= 1024;
00408          jj++;
00409       }
00410    }
00411    // Apply factor now
00412    if (s.IsDigit())
00413       lsize = s.Atoi() * fact;
00414 
00415    // Done
00416    return lsize;
00417 }
00418 
00419 //______________________________________________________________________________
00420 TFileCollection *TDataSetManager::GetDataSet(const char *, const char *)
00421 {
00422    // Utility function used in various methods for user dataset upload.
00423 
00424    AbstractMethod("GetDataSet");
00425    return (TFileCollection *)0;
00426 }
00427 
00428 //______________________________________________________________________________
00429 Bool_t TDataSetManager::RemoveDataSet(const char *)
00430 {
00431    // Removes the indicated dataset
00432 
00433    AbstractMethod("RemoveDataSet");
00434    return kFALSE;
00435 }
00436 
00437 //______________________________________________________________________________
00438 Bool_t TDataSetManager::ExistsDataSet(const char *)
00439 {
00440    // Checks if the indicated dataset exits
00441 
00442    AbstractMethod("ExistsDataSet");
00443    return kFALSE;
00444 }
00445 
00446 //______________________________________________________________________________
00447 TMap *TDataSetManager::GetDataSets(const char *, UInt_t)
00448 {
00449    //
00450    // Returns all datasets for the <group> and <user> specified by <uri>.
00451    // If <user> is 0, it returns all datasets for the given <group>.
00452    // If <group> is 0, it returns all datasets.
00453    // The returned TMap contains:
00454    //    <group> --> <map of users> --> <map of datasets> --> <dataset> (TFileCollection)
00455    //
00456    // The unsigned int 'option' is forwarded to GetDataSet and BrowseDataSet.
00457    // Available options (to be .or.ed):
00458    //    kShowDefault    a default selection is shown that include the ones from
00459    //                    the current user, the ones from the group and the common ones
00460    //    kPrint          print the dataset content
00461    //    kQuotaUpdate    update quotas
00462    //    kExport         use export naming
00463    //
00464    // NB1: options "kPrint", "kQuoatUpdate" and "kExport" are mutually exclusive
00465    // NB2: for options "kPrint" and "kQuoatUpdate" return is null.
00466 
00467    AbstractMethod("GetDataSets");
00468 
00469    return (TMap *)0;
00470 }
00471 //______________________________________________________________________________
00472 Int_t TDataSetManager::ScanDataSet(const char *uri, const char *opts)
00473 {
00474    // Scans the dataset indicated by 'uri' following the 'opts' directives
00475    //
00476    // The 'opts' string contains up to 4 directive fields separated by ':'
00477    //
00478    //  'selection' field :
00479    //    A, allfiles:    process all files
00480    //    D, staged:      process only staged (on Disk) files (if 'allfiles:' is not specified
00481    //                    the default is to process only files marked as non-staged)
00482    //  'pre-action field':
00483    //    O, open:        open the files marked as staged when processing only files
00484    //                    marked as non-staged
00485    //    T, touch:       open and touch the files marked as staged when processing
00486    //                    only files marked as non-staged
00487    //    I, nostagedcheck: do not check the actual stage status on selected files
00488    //
00489    //  'process' field:
00490    //    N, noaction:    do nothing on the selected files
00491    //    P, fullproc:    open the selected files and extract the meta information
00492    //    L, locateonly:  only locate the selected files
00493    //    S, stageonly:   issue a stage request for the selected files not yet staged
00494    //
00495    //  'auxilliary' field
00496    //    V, verbose:     notify the actions
00497    //
00498    // Returns 0 on success, -1 if any failure occurs.
00499 
00500    // Extract the directives
00501    UInt_t o = 0;
00502    if (opts) {
00503       // Selection options
00504       if (strstr(opts, "allfiles:") || strchr(opts, 'A'))
00505          o |= kAllFiles;
00506       else if (strstr(opts, "staged:") || strchr(opts, 'D'))
00507          o |= kStagedFiles;
00508       // Pre-action options
00509       if (strstr(opts, "open:") || strchr(opts, 'O'))
00510          o |= kReopen;
00511       if (strstr(opts, "touch:") || strchr(opts, 'T'))
00512          o |= kTouch;
00513       if (strstr(opts, "nostagedcheck:") || strchr(opts, 'I'))
00514          o |= kNoStagedCheck;
00515       // Process options
00516       if (strstr(opts, "noaction:") || strchr(opts, 'N'))
00517          o |= kNoAction;
00518       if (strstr(opts, "locateonly:") || strchr(opts, 'L'))
00519          o |= kLocateOnly;
00520       if (strstr(opts, "stageonly:") || strchr(opts, 'S'))
00521          o |= kStageOnly;
00522       // Auxilliary options
00523       if (strstr(opts, "verbose:") || strchr(opts, 'V'))
00524          o |= kDebug;
00525    } else {
00526       // Default
00527       o = kReopen | kDebug;
00528    }
00529 
00530    // Run
00531    return ScanDataSet(uri, o);
00532 }
00533 
00534 //______________________________________________________________________________
00535 Int_t TDataSetManager::ScanDataSet(const char *, UInt_t)
00536 {
00537    // Scans the dataset indicated by <uri> and returns the number of missing files.
00538    // Returns -1 if any failure occurs.
00539    // For more details, see documentation of
00540    // ScanDataSet(TFileCollection *dataset, const char *option)
00541 
00542    AbstractMethod("ScanDataSet");
00543 
00544    return -1;
00545 }
00546 
00547 //______________________________________________________________________________
00548 void TDataSetManager::GetQuota(const char *group, const char *user,
00549                                     const char *dsName, TFileCollection *dataset)
00550 {
00551    //
00552    // Gets quota information from this dataset
00553 
00554    if (gDebug > 0)
00555       Info("GetQuota", "processing dataset %s %s %s", group, user, dsName);
00556 
00557    if (dataset->GetTotalSize() > 0) {
00558       TParameter<Long64_t> *size =
00559          dynamic_cast<TParameter<Long64_t>*> (fGroupUsed.GetValue(group));
00560       if (!size) {
00561          size = new TParameter<Long64_t> ("group used", 0);
00562          fGroupUsed.Add(new TObjString(group), size);
00563       }
00564 
00565       size->SetVal(size->GetVal() + dataset->GetTotalSize());
00566 
00567       TMap *userMap = dynamic_cast<TMap*> (fUserUsed.GetValue(group));
00568       if (!userMap) {
00569          userMap = new TMap;
00570          fUserUsed.Add(new TObjString(group), userMap);
00571       }
00572 
00573       size = dynamic_cast<TParameter<Long64_t>*> (userMap->GetValue(user));
00574       if (!size) {
00575          size = new TParameter<Long64_t> ("user used", 0);
00576          userMap->Add(new TObjString(user), size);
00577       }
00578 
00579       size->SetVal(size->GetVal() + dataset->GetTotalSize());
00580    }
00581 }
00582 
00583 //______________________________________________________________________________
00584 void TDataSetManager::ShowQuota(const char *opt)
00585 {
00586    // Display quota information
00587 
00588    UpdateUsedSpace();
00589 
00590    TMap *groupQuotaMap = GetGroupQuotaMap();
00591    TMap *userUsedMap = GetUserUsedMap();
00592    if (!groupQuotaMap || !userUsedMap)
00593       return;
00594 
00595    Bool_t noInfo = kTRUE;
00596    TIter iter(groupQuotaMap);
00597    TObjString *group = 0;
00598    while ((group = dynamic_cast<TObjString*> (iter.Next()))) {
00599       noInfo = kFALSE;
00600       Long64_t groupQuota = GetGroupQuota(group->String());
00601       Long64_t groupUsed = GetGroupUsed(group->String());
00602 
00603       Printf(" +++ Group %s uses %.1f GB out of %.1f GB", group->String().Data(),
00604                                         (Float_t) groupUsed / DSM_ONE_GB,
00605                                        (Float_t) groupQuota / DSM_ONE_GB);
00606 
00607       // display also user information
00608       if (opt && !TString(opt).Contains("U", TString::kIgnoreCase))
00609          continue;
00610 
00611       TMap *userMap = dynamic_cast<TMap*> (userUsedMap->GetValue(group->String()));
00612       if (!userMap)
00613          continue;
00614 
00615       TIter iter2(userMap);
00616       TObjString *user = 0;
00617       while ((user = dynamic_cast<TObjString*> (iter2.Next()))) {
00618          TParameter<Long64_t> *size2 =
00619             dynamic_cast<TParameter<Long64_t>*> (userMap->GetValue(user->String().Data()));
00620          if (!size2)
00621             continue;
00622 
00623          Printf(" +++  User %s uses %.1f GB", user->String().Data(),
00624                                   (Float_t) size2->GetVal() / DSM_ONE_GB);
00625       }
00626 
00627       Printf("------------------------------------------------------");
00628    }
00629    // Check if something has been printed
00630    if (noInfo) {
00631       Printf(" +++ Quota check enabled but no quota info available +++ ");
00632    }
00633 }
00634 
00635 //______________________________________________________________________________
00636 void TDataSetManager::PrintUsedSpace()
00637 {
00638    //
00639    // Prints the quota
00640 
00641    Info("PrintUsedSpace", "listing used space");
00642 
00643    TIter iter(&fUserUsed);
00644    TObjString *group = 0;
00645    while ((group = dynamic_cast<TObjString*> (iter.Next()))) {
00646       TMap *userMap = dynamic_cast<TMap*> (fUserUsed.GetValue(group->String()));
00647 
00648       TParameter<Long64_t> *size =
00649          dynamic_cast<TParameter<Long64_t>*> (fGroupUsed.GetValue(group->String()));
00650 
00651       if (userMap && size) {
00652          Printf("Group %s: %lld B = %.2f GB", group->String().Data(), size->GetVal(),
00653                                       (Float_t) size->GetVal() / DSM_ONE_GB);
00654 
00655          TIter iter2(userMap);
00656          TObjString *user = 0;
00657          while ((user = dynamic_cast<TObjString*> (iter2.Next()))) {
00658             TParameter<Long64_t> *size2 =
00659                dynamic_cast<TParameter<Long64_t>*> (userMap->GetValue(user->String().Data()));
00660             if (size2)
00661                Printf("  User %s: %lld B = %.2f GB", user->String().Data(), size2->GetVal(),
00662                                             (Float_t) size2->GetVal() / DSM_ONE_GB);
00663          }
00664 
00665          Printf("------------------------------------------------------");
00666       }
00667    }
00668 }
00669 
00670 //______________________________________________________________________________
00671 void TDataSetManager::MonitorUsedSpace(TVirtualMonitoringWriter *monitoring)
00672 {
00673    //
00674    // Log info to the monitoring server
00675 
00676    Info("MonitorUsedSpace", "sending used space to monitoring server");
00677 
00678    TIter iter(&fUserUsed);
00679    TObjString *group = 0;
00680    while ((group = dynamic_cast<TObjString*> (iter.Next()))) {
00681       TMap *userMap = dynamic_cast<TMap*> (fUserUsed.GetValue(group->String()));
00682       TParameter<Long64_t> *size =
00683          dynamic_cast<TParameter<Long64_t>*> (fGroupUsed.GetValue(group->String()));
00684 
00685       if (!userMap || !size)
00686          continue;
00687 
00688       TList *list = new TList;
00689       list->SetOwner();
00690       list->Add(new TParameter<Long64_t>("_TOTAL_", size->GetVal()));
00691       Long64_t groupQuota = GetGroupQuota(group->String());
00692       if (groupQuota != -1)
00693          list->Add(new TParameter<Long64_t>("_QUOTA_", groupQuota));
00694 
00695       TIter iter2(userMap);
00696       TObjString *user = 0;
00697       while ((user = dynamic_cast<TObjString*> (iter2.Next()))) {
00698          TParameter<Long64_t> *size2 =
00699             dynamic_cast<TParameter<Long64_t>*> (userMap->GetValue(user->String().Data()));
00700          if (!size2)
00701             continue;
00702          list->Add(new TParameter<Long64_t>(user->String().Data(), size2->GetVal()));
00703       }
00704 
00705       monitoring->SendParameters(list, group->String());
00706       delete list;
00707    }
00708 }
00709 
00710 //______________________________________________________________________________
00711 Long64_t TDataSetManager::GetGroupUsed(const char *group)
00712 {
00713    //
00714    // Returns the used space of that group
00715 
00716    if (fgCommonDataSetTag == group)
00717       group = fCommonGroup;
00718 
00719    TParameter<Long64_t> *size =
00720       dynamic_cast<TParameter<Long64_t>*> (fGroupUsed.GetValue(group));
00721    if (!size) {
00722       if (gDebug > 0)
00723          Info("GetGroupUsed", "group %s not found", group);
00724       return 0;
00725    }
00726 
00727    return size->GetVal();
00728 }
00729 
00730 //______________________________________________________________________________
00731 Long64_t TDataSetManager::GetGroupQuota(const char *group)
00732 {
00733    //
00734    // returns the quota a group is allowed to have
00735 
00736    if (fgCommonDataSetTag == group)
00737       group = fCommonGroup;
00738 
00739    TParameter<Long64_t> *value =
00740       dynamic_cast<TParameter<Long64_t>*> (fGroupQuota.GetValue(group));
00741    if (!value) {
00742       if (gDebug > 0)
00743          Info("GetGroupQuota", "group %s not found", group);
00744       return 0;
00745    }
00746    return value->GetVal();
00747 }
00748 
00749 //______________________________________________________________________________
00750 void TDataSetManager::UpdateUsedSpace()
00751 {
00752    // updates the used space maps
00753 
00754    AbstractMethod("UpdateUsedSpace");
00755 }
00756 
00757 //______________________________________________________________________________
00758 Int_t TDataSetManager::RegisterDataSet(const char *,
00759                                        TFileCollection *, const char *)
00760 {
00761    // Register a dataset, perfoming quota checkings, if needed.
00762    // Returns 0 on success, -1 on failure
00763 
00764    AbstractMethod("RegisterDataSet");
00765    return -1;
00766 }
00767 
00768 //______________________________________________________________________________
00769 Int_t TDataSetManager::NotifyUpdate(const char * /*group*/,
00770                                     const char * /*user*/,
00771                                     const char * /*dspath*/,
00772                                     Long_t /*mtime*/,
00773                                     const char * /*checksum*/)
00774 {
00775    // Save into the <datasetdir>/dataset.list file the name of the last updated
00776    // or created or modified dataset
00777    // Returns 0 on success, -1 on error
00778 
00779    AbstractMethod("NotifyUpdate");
00780    return -1;
00781 }
00782 
00783 //______________________________________________________________________________
00784 Int_t TDataSetManager::ClearCache(const char * /*uri*/)
00785 {
00786    // Clear cached information matching uri
00787 
00788    AbstractMethod("ClearCache");
00789    return -1;
00790 }
00791 
00792 //______________________________________________________________________________
00793 Int_t TDataSetManager::ShowCache(const char * /*uri*/)
00794 {
00795    // Show cached information matching uri
00796 
00797    AbstractMethod("ShowCache");
00798    return -1;
00799 }
00800 
00801 //______________________________________________________________________________
00802 TString TDataSetManager::CreateUri(const char *dsGroup, const char *dsUser,
00803                                         const char *dsName, const char *dsObjPath)
00804 {
00805    // Creates URI for the dataset manger in the form '[[/dsGroup/]dsUser/]dsName[#dsObjPath]',
00806    // The optional dsObjPath can be in the form [subdir/]objname]'.
00807 
00808    TString uri;
00809 
00810    if (dsGroup && strlen(dsGroup) > 0) {
00811       if (dsUser && strlen(dsUser) > 0) {
00812          uri += Form("/%s/%s/", dsGroup, dsUser);
00813       } else {
00814          uri += Form("/%s/*/", dsGroup);
00815       }
00816    } else if (dsUser && strlen(dsUser) > 0) {
00817       uri += Form("%s/", dsUser);
00818    }
00819    if (dsName && strlen(dsName) > 0)
00820       uri += dsName;
00821    if (dsObjPath && strlen(dsObjPath) > 0)
00822       uri += Form("#%s", dsObjPath);
00823 
00824    // Done
00825    return uri;
00826 }
00827 
00828 //______________________________________________________________________________
00829 Bool_t TDataSetManager::ParseUri(const char *uri,
00830                                  TString *dsGroup, TString *dsUser,
00831                                  TString *dsName, TString *dsTree,
00832                                  Bool_t onlyCurrent, Bool_t wildcards)
00833 {
00834    // Parses a (relative) URI that describes a DataSet on the cluster.
00835    // The input 'uri' should be in the form '[[/group/]user/]dsname[#[subdir/]objname]',
00836    //  where 'objname' is the name of the object (e.g. the tree name) and the 'subdir'
00837    // is the directory in the file wher it should be looked for.
00838    // After resolving against a base URI consisting of proof://masterhost/group/user/
00839    // - meaning masterhost, group and user of the current session -
00840    // the path is checked to contain exactly three elements separated by '/':
00841    // group/user/dsname
00842    // If wildcards, '*' is allowed in group and user and dsname is allowed to be empty.
00843    // If onlyCurrent, only group and user of current session are allowed.
00844    // Only non-null parameters are filled by this function.
00845    // Returns kTRUE in case of success.
00846 
00847    // Append trailing slash if missing when wildcards are enabled
00848    TString uristr(uri);
00849    Int_t pc = 0;
00850    if (wildcards && uristr.Length() > 0) {
00851       pc = uristr.CountChar('/');
00852       Bool_t endsl = uristr.EndsWith("/") ? kTRUE : kFALSE;
00853       Bool_t beginsl = uristr.BeginsWith("/") ? kTRUE : kFALSE;
00854       if (beginsl) {
00855          if (pc == 1) uristr += "/*/";
00856          if (pc == 2 && endsl) uristr += "*/";
00857          if (pc == 2 && !endsl) uristr += "/";
00858       }
00859    }
00860 
00861    // Resolve given URI agains the base
00862    TUri resolved = TUri::Transform(uristr, fBase);
00863    if (resolved.HasQuery())
00864       Info ("ParseUri", "URI query part <%s> ignored", resolved.GetQuery().Data());
00865 
00866    TString path(resolved.GetPath());
00867    // Must be in the form /group/user/dsname
00868    if ((pc = path.CountChar('/')) != 3) {
00869       if (!TestBit(TDataSetManager::kIsSandbox)) {
00870          Error ("ParseUri", "illegal dataset path: '%s'", uri);
00871          return kFALSE;
00872       } else if (pc >= 0 && pc < 3) {
00873          // Add missing slashes
00874          TString sls("/");
00875          if (pc == 2) {
00876             sls = "/";
00877          } else if (pc == 1) {
00878             sls.Form("/%s/", fGroup.Data());
00879          } else if (pc == 0) {
00880             sls.Form("/%s/%s/", fGroup.Data(), fUser.Data());
00881          }
00882          path.Insert(0, sls);
00883       }
00884    }
00885    if (gDebug > 1)
00886       Info("ParseUri", "path: '%s'", path.Data());
00887 
00888    // Get individual values from tokens
00889    Int_t from = 1;
00890    TString group, user, name;
00891    path.Tokenize(group, from, "/");
00892    path.Tokenize(user, from, "/");
00893    path.Tokenize(name, from, "/");
00894 
00895    // The fragment may contain the subdir and the object name in the form '[subdir/]objname'
00896    TString tree = resolved.GetFragment();
00897    if (tree.EndsWith("/"))
00898       tree.Remove(tree.Length()-1);
00899 
00900    if (gDebug > 1)
00901       Info("ParseUri", "group: '%s', user: '%s', dsname:'%s', seg: '%s'",
00902                               group.Data(), user.Data(), name.Data(), tree.Data());
00903 
00904    // Check for unwanted use of wildcards
00905    if ((user == "*" || group == "*") && !wildcards) {
00906       Error ("ParseUri", "no wildcards allowed for user/group in this context (uri: '%s')", uri);
00907       return kFALSE;
00908    }
00909 
00910    // dsname may only be empty if wildcards expected
00911    if (name.IsNull() && !wildcards) {
00912       Error ("ParseUri", "DataSet name is empty");
00913       return kFALSE;
00914    }
00915 
00916    // Construct regexp whitelist for checking illegal characters in user/group
00917    TPRegexp wcExp (wildcards ? "^(?:[A-Za-z0-9-*_.]*|[*])$" : "^[A-Za-z0-9-_.]*$");
00918 
00919    // Check for illegal characters in all components
00920    if (!wcExp.Match(group)) {
00921       Error("ParseUri", "illegal characters in group (uri: '%s', group: '%s')", uri, group.Data());
00922       return kFALSE;
00923    }
00924 
00925    if (!wcExp.Match(user)) {
00926       Error("ParseUri", "illegal characters in user (uri: '%s', user: '%s')", uri, user.Data());
00927       return kFALSE;
00928    }
00929 
00930    // Construct regexp whitelist for checking illegal characters in name
00931    if (!wcExp.Match(name)) {
00932       Error("ParseUri", "illegal characters in name (uri: '%s', name: '%s')", uri, name.Data());
00933       return kFALSE;
00934    }
00935 
00936    if (tree.Contains(TRegexp("[^A-Za-z0-9-/_]"))) {
00937       Error("ParseUri", "Illegal characters in subdir/object name (uri: '%s', obj: '%s')", uri, tree.Data());
00938       return kFALSE;
00939    }
00940 
00941    // Check user & group
00942    if (onlyCurrent && (group.CompareTo(fGroup) || user.CompareTo(fUser))) {
00943       Error("ParseUri", "only datasets from your group/user allowed");
00944       return kFALSE;
00945    }
00946 
00947    // fill parameters passed by reference, if defined
00948    if (dsGroup)
00949       *dsGroup = group;
00950    if (dsUser)
00951       *dsUser = user;
00952    if (dsName)
00953       *dsName = name;
00954    if (dsTree)
00955       *dsTree = tree;
00956 
00957    return kTRUE;
00958 }
00959 
00960 //______________________________________________________________________________
00961 TMap *TDataSetManager::GetSubDataSets(const char *ds, const char *exclude)
00962 {
00963    // Partition dataset 'ds' accordingly to the servers.
00964    // The returned TMap contains:
00965    //                <server> --> <subdataset> (TFileCollection)
00966    // where <subdataset> is the subset of 'ds' on <server>
00967    // The partitioning is done using all the URLs in the TFileInfo's, so the
00968    // resulting datasets are not mutually exclusive.
00969    // The string 'exclude' contains a comma-separated list of servers to exclude
00970    // from the map.
00971 
00972    TMap *map = (TMap *)0;
00973 
00974    if (!ds || strlen(ds) <= 0) {
00975       Info("GetDataSets", "dataset name undefined!");
00976       return map;
00977    }
00978 
00979    // Get the dataset
00980    TFileCollection *fc = GetDataSet(ds);
00981    if (!fc) {
00982       Info("GetDataSets", "could not retrieve the dataset '%s'", ds);
00983       return map;
00984    }
00985 
00986    // Get the subset
00987    if (!(map = fc->GetFilesPerServer(exclude))) {
00988       if (gDebug > 0)
00989          Info("GetDataSets", "could not get map for '%s'", ds);
00990    }
00991 
00992    // Cleanup
00993    delete fc;
00994 
00995    // Done
00996    return map;
00997 }
00998 
00999 //______________________________________________________________________________
01000 void TDataSetManager::PrintDataSet(TFileCollection *fc, Int_t popt)
01001 {
01002    // Formatted printout of the content of TFileCollection 'fc'.
01003    // Options in the form
01004    //           popt = u * 10 + f
01005    //     f    0 => header only, 1 => header + files
01006    //   when printing files
01007    //     u    0 => print file name only, 1 => print full URL
01008 
01009    if (!fc) return;
01010 
01011    Int_t f = popt%10;
01012    Int_t u = popt - 10 * f;
01013 
01014    Printf("+++");
01015    if (fc->GetTitle() && (strlen(fc->GetTitle()) > 0)) {
01016       Printf("+++ Dumping: %s: ", fc->GetTitle());
01017    } else {
01018       Printf("+++ Dumping: %s: ", fc->GetName());
01019    }
01020    Printf("%s", fc->ExportInfo("+++ Summary:", 1)->GetName());
01021    if (f == 1) {
01022       Printf("+++ Files:");
01023       Int_t nf = 0;
01024       TIter nxfi(fc->GetList());
01025       TFileInfo *fi = 0;
01026       while ((fi = (TFileInfo *)nxfi())) {
01027          if (u == 1)
01028             Printf("+++ %5d. %s", ++nf, fi->GetCurrentUrl()->GetUrl());
01029          else
01030             Printf("+++ %5d. %s", ++nf, fi->GetCurrentUrl()->GetFile());
01031       }
01032    }
01033    Printf("+++");
01034 }
01035 
01036 //______________________________________________________________________________
01037 void TDataSetManager::ShowDataSets(const char *uri, const char *opt)
01038 {
01039    // Prints formatted information about the dataset 'uri'.
01040    // The type and format of output is driven by 'opt':
01041    //
01042    //   1. opt = "server:srv1[,srv2[,srv3[,...]]]"
01043    //            Print info about the subsets of 'uri' on servers srv1, srv2, ...
01044    //   2. opt = "servers[:exclude:srv1[,srv2[,srv3[,...]]]]"
01045    //            Print info about the subsets of 'uri' on all servers, except
01046    //            the ones in the exclude list srv1, srv2, ...
01047    //   3. opt = <any>
01048    //            Print info about all datasets matching 'uri'
01049    //
01050    //   If 'opt' contains 'full:' the list of files in the datasets are also printed.
01051    //   In case 3. this is enabled only if 'uri' matches a single dataset.
01052    //
01053    //   In case 3, if 'opt' contains
01054    //      'full:'      the list of files in the datasets are also printed.
01055    //      'forcescan:' the dataset are open to get the information; otherwise the
01056    //                   pre-processed information is used.
01057    //      'noheader:'  the labelling header is not printed; usefull when to chain
01058    //                   several printouts
01059    //      'noupdate:'  do not update the cache (which may be slow on very remote
01060    //                   servers)
01061    //      'refresh:'   refresh the information (requires appropriate credentials;
01062    //                   typically it can be done only for owned datasets)
01063 
01064    TFileCollection *fc = 0;
01065    TString o(opt);
01066    Int_t popt = 0;
01067    if (o.Contains("full:")) {
01068       o.ReplaceAll("full:","");
01069       popt = 1;
01070    }
01071    if (o.BeginsWith("server:")) {
01072       o.ReplaceAll("server:", "");
01073       TString srv;
01074       Int_t from = 0;
01075       while ((o.Tokenize(srv, from, ","))) {
01076          fc = GetDataSet(uri, srv.Data());
01077          PrintDataSet(fc, popt);
01078          delete fc;
01079       }
01080    } else if (o.BeginsWith("servers")) {
01081       o.ReplaceAll("servers", "");
01082       if (o.BeginsWith(":exclude:"))
01083          o.ReplaceAll(":exclude:", "");
01084       else
01085          o = "";
01086       TMap *dsmap = GetSubDataSets(uri, o.Data());
01087       if (dsmap) {
01088          TIter nxk(dsmap);
01089          TObject *k = 0;
01090          while ((k = nxk()) && (fc = (TFileCollection *) dsmap->GetValue(k))) {
01091             PrintDataSet(fc, popt);
01092          }
01093          delete dsmap;
01094       }
01095    } else {
01096       TString u(uri), grp, usr, dsn;
01097       // Support for "*" or "/*"
01098       if (u == "" || u == "*" || u == "/*" || u == "/*/" || u == "/*/*") u = "/*/*/";
01099       if (!ParseUri(u.Data(), &grp, &usr, &dsn, 0, kFALSE, kTRUE))
01100          Warning("ShowDataSets", "problems parsing URI '%s'", uri);
01101       // Scan the existing datasets and print the content
01102       UInt_t xopt = (UInt_t)(TDataSetManager::kPrint);
01103       if (o.Contains("forcescan:")) xopt |= (UInt_t)(TDataSetManager::kForceScan);
01104       if (o.Contains("noheader:")) xopt |= (UInt_t)(TDataSetManager::kNoHeaderPrint);
01105       if (o.Contains("noupdate:")) xopt |= (UInt_t)(TDataSetManager::kNoCacheUpdate);
01106       if (o.Contains("refresh:")) xopt |= (UInt_t)(TDataSetManager::kRefreshLs);
01107       if (!u.IsNull() && !u.Contains("*") && !grp.IsNull() && !usr.IsNull() && !dsn.IsNull()) {
01108          if (ExistsDataSet(uri)) {
01109             // Single dataset
01110             if (popt == 0) {
01111                // Quick listing
01112                GetDataSets(u.Data(), xopt);
01113             } else if ((fc = GetDataSet(uri))) {
01114                // Full print option
01115                PrintDataSet(fc, 10 + popt);
01116                delete fc;
01117             }
01118             return;
01119          }
01120          // Try all the directories
01121          TRegexp reg(grp, kTRUE), reu(usr, kTRUE);
01122          if (u.Index(reg) == kNPOS) grp = "*";
01123          if (u.Index(reu) == kNPOS) usr = "*";
01124          // Rebuild the uri
01125          u.Form("/%s/%s/%s", grp.Data(), usr.Data(), dsn.Data());
01126       }
01127       GetDataSets(u.Data(), xopt);
01128    }
01129 
01130    return;
01131 }
01132 
01133 //______________________________________________________________________________
01134 Int_t TDataSetManager::ScanDataSet(TFileCollection *dataset,
01135                                    Int_t fopt, Int_t sopt, Int_t ropt, Bool_t dbg,
01136                                    Int_t *touched, Int_t *opened, Int_t *disappeared,
01137                                    TList *flist, Long64_t avgsz, const char *mss,
01138                                    Int_t maxfiles, const char *stageopts)
01139 {
01140    // Go through the files in the specified dataset, selecting files according to
01141    // 'fopt' and doing on these files the actions described by 'sopt'.
01142    // If required, the information in 'dataset' is updated.
01143    //
01144    // The int fopt controls which files have to be processed (or added to the list
01145    // if ropt is 1 - see below); 'fopt' is defined in term of csopt and fsopt:
01146    //                    fopt = sign(fsopt) * csopt * 100 + fsopt
01147    // where 'fsopt' controls the actual selection
01148    //    -1              all files in the dataset
01149    //     0              process only files marked as 'non-staged'
01150    //   >=1              as 0 but files that are marked 'staged' are open
01151    //   >=2              as 1 but files that are marked 'staged' are touched
01152    //    10              process only files marked as 'staged'; files marked as 'non-staged'
01153    //                    are ignored
01154    // and 'csopt' controls if an actual check on the staged status (via TFileStager) is done
01155    //     0              check that the file is staged using TFileStager
01156    //     1              do not hard check the staged status
01157    // (example: use fopt = -101 to check the staged status of all the files, or fopt = 110
01158    //  to re-check the stage status of all the files marked as staged)
01159    //
01160    // If 'dbg' is true, some information about the ongoing operations is reguraly
01161    // printed; this can be useful when processing very large datasets, an operation
01162    // which can take a very long time.
01163    //
01164    // The int 'sopt' controls what is done on the selected files (this is effective only
01165    // if ropt is 0 or 2 - see below):
01166    //    -1              no action (fopt = 2 and sopt = -1 touches all staged files)
01167    //     0              do the full process: open the files and fill the meta-information
01168    //                    in the TFileInfo object, including the end-point URL
01169    //     1              only locate the files, by updating the end-point URL (uses TFileStager::Locate
01170    //                    which is must faster of an TFile::Open)
01171    //     2              issue a stage request on the files
01172    //
01173    // The int 'ropt' controls which actions are performed:
01174    //     0              do the full process: get list of files to process and process them
01175    //     1              get the list of files to be scanned and return it in flist
01176    //     2              process the files in flist (according to sopt)
01177    // When defined flist is under the responsability the caller.
01178    //
01179    // If avgsz > 0 it is used for the final update of the dataset global counters.
01180    //
01181    // If 'mss' is defined use it to initialize the stager (instead of the Url in the
01182    // TFileInfo objects)
01183    //
01184    // If maxfiles > 0, select for processing a maximum of 'filesmax' files (but if fopt is 1 or 2
01185    // all files marked as 'staged' are still open or touched)
01186    //
01187    // Return code
01188    //     1 dataset was not changed
01189    //     2 dataset was changed
01190    //
01191    // The number of touched, opened and disappeared files are returned in the respective
01192    // variables, if these are defined.
01193 
01194    // Max number of files
01195    if (maxfiles > -1 && dbg)
01196       ::Info("TDataSetManager::ScanDataSet", "processing a maximum of %d files", maxfiles);
01197 
01198    // File selection, Reopen and Touch options
01199    Bool_t allf     = (fopt == -1)               ? kTRUE : kFALSE;
01200    Bool_t checkstg = (fopt >= 100 || fopt < -1) ? kFALSE : kTRUE;
01201    if (fopt >= 0) fopt %= 100;
01202    Bool_t nonstgf  = (fopt >= 0 && fopt < 10)   ? kTRUE : kFALSE;
01203    Bool_t reopen   = (fopt >= 1 && fopt < 10)   ? kTRUE : kFALSE;
01204    Bool_t touch    = (fopt >= 2 && fopt < 10)   ? kTRUE : kFALSE;
01205    Bool_t stgf     = (fopt == 10)               ? kTRUE : kFALSE;
01206 
01207    // File processing options
01208    Bool_t noaction   = (sopt == -1) ? kTRUE : kFALSE;
01209    Bool_t fullproc   = (sopt == 0)  ? kTRUE : kFALSE;
01210    Bool_t locateonly = (sopt == 1)  ? kTRUE : kFALSE;
01211    Bool_t stageonly  = (sopt == 2)  ? kTRUE : kFALSE;
01212 
01213    // Run options
01214    Bool_t doall       = (ropt == 0) ? kTRUE : kFALSE;
01215    Bool_t getlistonly = (ropt == 1) ? kTRUE : kFALSE;
01216    Bool_t scanlist    = (ropt == 2) ? kTRUE : kFALSE;
01217    if (scanlist && !flist) {
01218       ::Error("TDataSetManager::ScanDataSet", "input list is mandatory for option 'scan file list'");
01219       return -1;
01220    }
01221 
01222    Int_t ftouched = 0;
01223    Int_t fopened = 0;
01224    Int_t fdisappeared = 0;
01225 
01226    Bool_t changed = kFALSE;
01227 
01228    TList *newStagedFiles = 0;
01229    TFileInfo *fileInfo = 0;
01230    TFileStager *stager = 0;
01231    Bool_t createStager = kFALSE;
01232 
01233    if (doall || getlistonly) {
01234 
01235       // Point to the list
01236       newStagedFiles = (!doall && getlistonly && flist) ? flist : new TList;
01237       if (newStagedFiles != flist) newStagedFiles->SetOwner(kFALSE);
01238 
01239       stager = (mss && strlen(mss) > 0) ? TFileStager::Open(mss) : 0;
01240       createStager = (stager) ? kFALSE : kTRUE;
01241 
01242       // Check which files have been staged, this can be replaced by a bulk command,
01243       // once it exists in the xrdclient
01244       TIter iter2(dataset->GetList());
01245       while ((fileInfo = (TFileInfo *) iter2())) {
01246 
01247          // For real time monitoring
01248          gSystem->DispatchOneEvent(kTRUE);
01249 
01250          if (!allf) {
01251 
01252             fileInfo->ResetUrl();
01253             if (!fileInfo->GetCurrentUrl()) {
01254                ::Error("TDataSetManager::ScanDataSet", "GetCurrentUrl() returned 0 for %s",
01255                                                       fileInfo->GetFirstUrl()->GetUrl());
01256                continue;
01257             }
01258 
01259             if (nonstgf && fileInfo->TestBit(TFileInfo::kStaged)) {
01260 
01261                // Skip files flagged as corrupted
01262                if (fileInfo->TestBit(TFileInfo::kCorrupted)) continue;
01263 
01264                // Skip if we are not asked to re-open the staged files
01265                if (!reopen) continue;
01266 
01267                // Set the URL removing the anchor (e.g. #AliESDs.root) because IsStaged()
01268                // and TFile::Open() with filetype=raw do not accept anchors
01269                TUrl *curl = fileInfo->GetCurrentUrl();
01270                const char *furl = curl->GetUrl();
01271                TString urlmod;
01272                if (TDataSetManager::CheckDataSetSrvMaps(curl, urlmod) && !(urlmod.IsNull()))
01273                   furl = urlmod.Data();
01274                TUrl url(furl);   
01275                url.SetAnchor("");
01276 
01277                // Notify
01278                if (dbg && (ftouched+fdisappeared) % 100 == 0)
01279                   ::Info("TDataSetManager::ScanDataSet", "opening %d: file: %s",
01280                         ftouched + fdisappeared, curl->GetUrl());
01281 
01282                // Check if file is still available, if touch is set actually read from the file
01283                TString uopt(url.GetOptions());
01284                uopt += "filetype=raw&mxredir=2";
01285                url.SetOptions(uopt.Data());
01286                TFile *file = TFile::Open(url.GetUrl());
01287                if (file) {
01288                   if (touch) {
01289                      // Actually access the file
01290                      char tmpChar = 0;
01291                      file->ReadBuffer(&tmpChar, 1);
01292                      // Count
01293                      ftouched++;
01294                   }
01295                   file->Close();
01296                   delete file;
01297                } else {
01298                   // File could not be opened, reset staged bit
01299                   if (dbg) ::Info("TDataSetManager::ScanDataSet", "file %s disappeared", url.GetUrl());
01300                   fileInfo->ResetBit(TFileInfo::kStaged);
01301                   fdisappeared++;
01302                   changed = kTRUE;
01303 
01304                   // Remove invalid URL, if other one left...
01305                   if (fileInfo->GetNUrls() > 1)
01306                      fileInfo->RemoveUrl(curl->GetUrl());
01307                }
01308                // Go to next
01309                continue;
01310 
01311             } else if (stgf && !(fileInfo->TestBit(TFileInfo::kStaged))) {
01312                // All staged files are processed: skip non staged
01313                continue;
01314             }
01315          }
01316 
01317          // Only open maximum number of 'new' files
01318          if (maxfiles > 0 && newStagedFiles->GetEntries() >= maxfiles)
01319             continue;
01320 
01321          // Hard check of the staged status, if required
01322          if (checkstg) {
01323             // Set the URL removing the anchor (e.g. #AliESDs.root) because IsStaged()
01324             // and TFile::Open() with filetype=raw do not accept anchors
01325             TUrl *curl = fileInfo->GetCurrentUrl();
01326             const char *furl = curl->GetUrl();
01327             TString urlmod;
01328             Bool_t mapped = kFALSE;
01329             if (TDataSetManager::CheckDataSetSrvMaps(curl, urlmod) && !(urlmod.IsNull())) {
01330                furl = urlmod.Data();
01331                mapped = kTRUE;
01332             }
01333             TUrl url(furl);
01334             url.SetAnchor("");
01335 
01336             // Get the stager (either the global one or from the URL)
01337             stager = createStager ? TFileStager::Open(url.GetUrl()) : stager;
01338 
01339             Bool_t result = kFALSE;
01340             if (stager) {
01341                result = stager->IsStaged(url.GetUrl());
01342                if (gDebug > 0)
01343                   ::Info("TDataSetManager::ScanDataSet", "IsStaged: %s: %d", url.GetUrl(), result);
01344                if (createStager)
01345                   SafeDelete(stager);
01346             } else {
01347                ::Warning("TDataSetManager::ScanDataSet",
01348                         "could not get stager instance for '%s'", url.GetUrl());
01349             }
01350 
01351             // Go to next in case of failure
01352             if (!result) {
01353                if (fileInfo->TestBit(TFileInfo::kStaged)) {
01354                   // Reset the bit
01355                   fileInfo->ResetBit(TFileInfo::kStaged);
01356                   changed = kTRUE;
01357                }
01358                continue;
01359             } else {
01360                if (!(fileInfo->TestBit(TFileInfo::kStaged))) {
01361                   // Set the bit
01362                   fileInfo->SetBit(TFileInfo::kStaged);
01363                   changed = kTRUE;
01364                }
01365             }
01366 
01367             // If the url was re-mapped add the new url in front of the list
01368             if (mapped) {
01369                url.SetOptions(curl->GetOptions());
01370                url.SetAnchor(curl->GetAnchor());
01371                fileInfo->AddUrl(url.GetUrl(), kTRUE);
01372             }
01373          }
01374 
01375          // Register the newly staged file
01376          if (!noaction) newStagedFiles->Add(fileInfo);
01377       }
01378       SafeDelete(stager);
01379 
01380       // If required to only get the list we are done
01381       if (getlistonly) {
01382          if (dbg && newStagedFiles->GetEntries() > 0)
01383             ::Info("TDataSetManager::ScanDataSet", " %d files appear to be newly staged",
01384                                                    newStagedFiles->GetEntries());
01385          if (!flist) SafeDelete(newStagedFiles);
01386          return ((changed) ? 2 : 1);
01387       }
01388    }
01389 
01390    if (!noaction && (doall || scanlist)) {
01391 
01392       // Point to the list
01393       newStagedFiles = (!doall && scanlist && flist) ? flist : newStagedFiles;
01394       if (newStagedFiles != flist) newStagedFiles->SetOwner(kFALSE);
01395 
01396       // loop over now staged files
01397       if (dbg && newStagedFiles->GetEntries() > 0)
01398          ::Info("TDataSetManager::ScanDataSet", "opening %d files that appear to be newly staged",
01399                                                 newStagedFiles->GetEntries());
01400 
01401       // If staging files, prepare the stager
01402       if (locateonly || stageonly) {
01403          stager = (mss && strlen(mss) > 0) ? TFileStager::Open(mss) : 0;
01404          createStager = (stager) ? kFALSE : kTRUE;
01405       }
01406 
01407       // Notify each 'fqnot' files (min 1, max 100)
01408       Int_t fqnot = (newStagedFiles->GetSize() > 10) ? newStagedFiles->GetSize() / 10 : 1;
01409       if (fqnot > 100) fqnot = 100;
01410       Int_t count = 0;
01411       TIter iter3(newStagedFiles);
01412       while ((fileInfo = (TFileInfo *) iter3())) {
01413 
01414          if (dbg && (count%fqnot == 0))
01415             ::Info("TDataSetManager::ScanDataSet", "processing %d.'new' file: %s",
01416                                                    count, fileInfo->GetCurrentUrl()->GetUrl());
01417          count++;
01418 
01419          // For real time monitoring
01420          gSystem->DispatchOneEvent(kTRUE);
01421 
01422          Int_t rc = -1;
01423          // Set the URL removing the anchor (e.g. #AliESDs.root) because IsStaged()
01424          // and TFile::Open() with filetype=raw do not accept anchors
01425          TUrl *curl = fileInfo->GetCurrentUrl();
01426          const char *furl = curl->GetUrl();
01427          TString urlmod;
01428          Bool_t mapped = kFALSE;
01429          if (TDataSetManager::CheckDataSetSrvMaps(curl, urlmod) && !(urlmod.IsNull())) {
01430             furl = urlmod.Data();
01431             mapped = kTRUE;
01432          }
01433          TUrl url(furl);
01434          url.SetOptions("");
01435          url.SetAnchor("");
01436          // Point to the right stager
01437          if (createStager) {
01438             if (!stager || (stager && !stager->Matches(url.GetUrl()))) {
01439                SafeDelete(stager);
01440                if (!(stager = TFileStager::Open(url.GetUrl())) || !(stager->IsValid())) {
01441                   ::Error("TDataSetManager::ScanDataSet",
01442                            "could not get valid stager instance for '%s'", url.GetUrl());
01443                   continue;
01444                }
01445             }
01446          }
01447          // Locate the file, if just requested so
01448          if (locateonly) {
01449             TString eurl;
01450             if (stager && stager->Locate(url.GetUrl(), eurl) == 0) {
01451                TString opts(curl->GetOptions());
01452                TString anch(curl->GetAnchor());
01453                // Get the effective end-point Url
01454                curl->SetUrl(eurl);
01455                // Restore original options and anchor, if any
01456                curl->SetOptions(opts);
01457                curl->SetAnchor(anch);
01458                // Flag and count
01459                changed = kTRUE;
01460                fopened++;
01461             } else {
01462                // Failure
01463                ::Error("TDataSetManager::ScanDataSet", "could not locate %s", url.GetUrl());
01464             }
01465          } else if (stageonly) {
01466             if (stager && !(stager->IsStaged(url.GetUrl()))) {
01467                if (!(stager->Stage(url.GetUrl(), stageopts))) {
01468                   // Failure
01469                   ::Error("TDataSetManager::ScanDataSet",
01470                            "problems issuing stage request for %s", url.GetUrl());
01471                }
01472             }
01473          } else if (fullproc) {
01474             // Full file validation
01475             rc = -2;
01476             Bool_t doscan = kTRUE;
01477             if (checkstg) {
01478                doscan = kFALSE;
01479                if ((doall && fileInfo->TestBit(TFileInfo::kStaged)) ||
01480                    (stager && stager->IsStaged(url.GetUrl()))) doscan = kTRUE;
01481             }
01482             if (doscan) {
01483                if ((rc = TDataSetManager::ScanFile(fileInfo, dbg)) < -1) continue;
01484                changed = kTRUE;
01485             } else if (stager) {
01486                ::Warning("TDataSetManager::ScanDataSet",
01487                          "required file '%s' does not look as being online (staged)", url.GetUrl());
01488             }
01489             if (rc < 0) continue;
01490             // Count
01491             fopened++;
01492          }
01493       }
01494       if (newStagedFiles != flist) SafeDelete(newStagedFiles);
01495 
01496       dataset->RemoveDuplicates();
01497       dataset->Update(avgsz);
01498    }
01499 
01500    Int_t result = (changed) ? 2 : 1;
01501    if (result > 0 && dbg)
01502       ::Info("TDataSetManager::ScanDataSet", "%d files 'new'; %d files touched;"
01503                                              " %d files disappeared", fopened, ftouched, fdisappeared);
01504 
01505    // Fill outputs, if required
01506    if (touched) *touched = ftouched;
01507    if (opened) *opened = fopened;
01508    if (disappeared) *disappeared = fdisappeared;
01509 
01510    // For real time monitoring
01511    gSystem->DispatchOneEvent(kTRUE);
01512 
01513    return result;
01514 }
01515 
01516 //______________________________________________________________________________
01517 Int_t TDataSetManager::ScanFile(TFileInfo *fileinfo, Bool_t dbg)
01518 {
01519    // Open the file described by 'fileinfo' to extract the relevant meta-information.
01520    // Return 0 if OK, -2 if the file cannot be open, -1 if it is corrupted
01521 
01522    Int_t rc = -2;
01523    // We need an input
01524    if (!fileinfo) {
01525       ::Error("TDataSetManager::ScanFile", "undefined input (!)");
01526       return rc;
01527    }
01528 
01529    TUrl *url = fileinfo->GetCurrentUrl();
01530 
01531    TFile *file = 0;
01532 
01533    // To determine the size we have to open the file without the anchor
01534    // (otherwise we get the size of the contained file - in case of a zip archive)
01535    // We open in raw mode which makes sure that the opening succeeds, even if
01536    // the file is corrupted
01537    const char *furl = url->GetUrl();
01538    TString urlmod;
01539    if (TDataSetManager::CheckDataSetSrvMaps(url, urlmod) && !(urlmod.IsNull()))
01540       furl = urlmod.Data();
01541    TUrl urlNoAnchor(furl);
01542    urlNoAnchor.SetAnchor("");
01543    urlNoAnchor.SetOptions("filetype=raw");
01544    // Wait max 5 secs per file
01545    if (!(file = TFile::Open(urlNoAnchor.GetUrl(), "TIMEOUT=5"))) return rc;
01546 
01547    // OK, set the relevant flags
01548    rc = -1;
01549    fileinfo->SetBit(TFileInfo::kStaged);
01550 
01551    // Add url of the disk server in front of the list
01552    TUrl eurl(*(file->GetEndpointUrl()));
01553    eurl.SetOptions(url->GetOptions());
01554    eurl.SetAnchor(url->GetAnchor());
01555    fileinfo->AddUrl(eurl.GetUrl(), kTRUE);
01556 
01557    if (gDebug > 0) ::Info("TDataSetManager::ScanFile", "added URL %s", eurl.GetUrl());
01558 
01559    if (file->GetSize() > 0) fileinfo->SetSize(file->GetSize());
01560    fileinfo->SetUUID(file->GetUUID().AsString());
01561 
01562    file->Close();
01563    delete file;
01564 
01565    // Disable warnings when reading a tree without loading the corresponding library
01566    Int_t oldLevel = gErrorIgnoreLevel;
01567    gErrorIgnoreLevel = kError+1;
01568 
01569    // Wait max 5 secs per file
01570    if (!(file = TFile::Open(url->GetUrl(), "TIMEOUT=5"))) {
01571       // If the file could be opened before, but fails now it is corrupt...
01572       if (dbg) ::Info("TDataSetManager::ScanFile", "marking %s as corrupt", url->GetUrl());
01573       fileinfo->SetBit(TFileInfo::kCorrupted);
01574       // Set back old warning level
01575       gErrorIgnoreLevel = oldLevel;
01576       return rc;
01577    }
01578    rc = 0;
01579 
01580    // Loop over all entries and create/update corresponding metadata.
01581    // TODO If we cannot read some of the trees, is the file corrupted as well?
01582    if ((rc = TDataSetManager::FillMetaData(fileinfo, file, "/")) != 0) {
01583       ::Error("TDataSetManager::ScanFile",
01584               "problems processing the directory tree in looking for metainfo");
01585    }
01586    // Set back old warning level
01587    gErrorIgnoreLevel = oldLevel;
01588 
01589    file->Close();
01590    delete file;
01591 
01592    // Done
01593    return rc;
01594 }
01595 
01596 //_______________________________________________________________________________________
01597 Int_t TDataSetManager::FillMetaData(TFileInfo *fi, TDirectory *d, const char *rdir)
01598 {
01599    // Navigate the directory 'd' (and its subdirectories) looking for TTree objects.
01600    // Fill in the relevant metadata information in 'fi'. The name of the TFileInfoMeta
01601    // metadata entry will be "/dir1/dir2/.../tree_name".
01602    // Return 0 on success, -1 if any problem happens (object found in keys cannot be read,
01603    // for example)
01604 
01605    // Check inputs
01606    if (!fi || !d || !rdir) {
01607       ::Error("TDataSetManager::FillMetaData",
01608               "some inputs are invalid (fi:%p,d:%p,r:%s)", fi, d, rdir);
01609       return -1;
01610    }
01611 
01612    if (d->GetListOfKeys()) {
01613       TIter nxk(d->GetListOfKeys());
01614       TKey *k = 0;
01615       while ((k = dynamic_cast<TKey *> (nxk()))) {
01616 
01617          if (TClass::GetClass(k->GetClassName())->InheritsFrom(TDirectory::Class())) {
01618             // Get the directory
01619             TDirectory *sd = (TDirectory *) d->Get(k->GetName());
01620             if (!sd) {
01621                ::Error("TDataSetManager::FillMetaData", "cannot get sub-directory '%s'", k->GetName());
01622                return -1;
01623             }
01624             if (TDataSetManager::FillMetaData(fi, sd, TString::Format("%s%s/", rdir, k->GetName())) != 0) {
01625                ::Error("TDataSetManager::FillMetaData", "problems processing sub-directory '%s'", k->GetName());
01626                return -1;
01627             }
01628 
01629          } else {
01630             // We process only trees
01631             if (!TClass::GetClass(k->GetClassName())->InheritsFrom(TTree::Class())) continue;
01632 
01633             TString ks;
01634             ks.Form("%s%s", rdir, k->GetName());
01635 
01636             TFileInfoMeta *md = fi->GetMetaData(ks);
01637             if (!md) {
01638                // Create it
01639                md = new TFileInfoMeta(ks, k->GetClassName());
01640                fi->AddMetaData(md);
01641                if (gDebug > 0)
01642                   ::Info("TDataSetManager::FillMetaData", "created meta data for tree %s", ks.Data());
01643             }
01644             // Fill values
01645             TTree *t = dynamic_cast<TTree *> (d->Get(k->GetName()));
01646             if (t) {
01647                if (t->GetEntries() >= 0) {
01648                   md->SetEntries(t->GetEntries());
01649                   if (t->GetTotBytes() >= 0)
01650                      md->SetTotBytes(t->GetTotBytes());
01651                   if (t->GetZipBytes() >= 0)
01652                      md->SetZipBytes(t->GetZipBytes());
01653                }
01654             } else {
01655                ::Error("TDataSetManager::FillMetaData", "could not get tree '%s'", k->GetName());
01656                return -1;
01657             }
01658          }
01659       }
01660    }
01661    // Done
01662    return 0;
01663 }
01664 
01665 //_______________________________________________________________________________________
01666 TList *TDataSetManager::ParseDataSetSrvMaps(const TString &srvmaps)
01667 {
01668    // Create a server mapping list from the content of 'srvmaps'
01669    // Return the list (owned by the caller) or 0 if no valid info could be found)
01670 
01671    TList *srvmapslist = 0;
01672    if (srvmaps.IsNull()) {
01673       ::Warning("TDataSetManager::ParseDataSetSrvMaps",
01674                 "called with an empty string! - nothing to do");
01675       return srvmapslist;
01676    }
01677    TString srvmap, sf, st;
01678    Int_t from = 0, from1 = 0;
01679    while (srvmaps.Tokenize(srvmap, from, " ")) {
01680       sf = ""; st = "";
01681       if (srvmap.Contains("|")) {
01682          from1 = 0;
01683          if (srvmap.Tokenize(sf, from1, "|")) srvmap.Tokenize(st, from1, "|");
01684       } else {
01685          st = srvmap;
01686       }
01687       if (st.IsNull()) {
01688          ::Warning("TDataSetManager::ParseDataSetSrvMaps",
01689                    "parsing DataSet.SrvMaps: target must be defined"
01690                    " (token: %s) - ignoring", srvmap.Data());
01691          continue;
01692       } else if (!(st.EndsWith("/"))) {
01693          st += "/";
01694       }
01695       // TUrl if wildcards or TObjString
01696       TString sp;
01697       TUrl *u = 0;
01698       if (!(sf.IsNull()) && sf.Contains("*")) {
01699          u = new TUrl(sf);
01700          if (!(sf.BeginsWith(u->GetProtocol()))) u->SetProtocol("root");
01701          sp.Form(":%d", u->GetPort());
01702          if (!(sf.Contains(sp))) u->SetPort(1094);
01703          if (!TString(u->GetHost()).Contains("*")) SafeDelete(u);
01704       }
01705       if (!srvmapslist) srvmapslist = new TList;
01706       if (u) {
01707          srvmapslist->Add(new TPair(u, new TObjString(st)));
01708       } else {
01709          srvmapslist->Add(new TPair(new TObjString(sf), new TObjString(st)));
01710       }
01711    }
01712    // Done
01713    if (srvmapslist) srvmapslist->SetOwner(kTRUE);
01714    return srvmapslist;
01715 }
01716 
01717 //_______________________________________________________________________________________
01718 TList *TDataSetManager::GetDataSetSrvMaps()
01719 {
01720    // Static getter for server mapping list
01721 
01722    return fgDataSetSrvMaps;
01723 }
01724  
01725 //_______________________________________________________________________________________
01726 Bool_t TDataSetManager::CheckDataSetSrvMaps(TUrl *furl, TString &file1, TList *srvmaplist)
01727 {
01728    // Check if the dataset server mappings apply to the url defined by 'furl'.
01729    // Use srvmaplist if defined, else use the default list.
01730    // If yes, resolve the mapping into file1 and return kTRUE.
01731    // Otherwise return kFALSE.
01732 
01733    Bool_t replaced = kFALSE;
01734    if (!furl) return replaced;
01735 
01736    const char *file = furl->GetUrl();
01737    TList *mlist = (srvmaplist) ? srvmaplist : fgDataSetSrvMaps;
01738    if (mlist && mlist->GetSize() > 0) {
01739       TIter nxm(mlist);
01740       TPair *pr = 0;
01741       while ((pr = (TPair *) nxm())) {
01742          Bool_t replace = kFALSE;
01743          // If TUrl apply reg exp on host
01744          TUrl *u = dynamic_cast<TUrl *>(pr->Key());
01745          if (u) {
01746             if (!strcmp(u->GetProtocol(), furl->GetProtocol())) {
01747                Ssiz_t len;
01748                if (!strcmp(u->GetProtocol(), "file")) {
01749                   TRegexp re(u->GetFileAndOptions(), kTRUE);
01750                   if (re.Index(furl->GetFileAndOptions(), &len) == 0) replace = kTRUE;
01751                } else {
01752                   if (u->GetPort() == furl->GetPort()) {
01753                      TRegexp re(u->GetHost(), kTRUE);
01754                      if (re.Index(furl->GetHost(), &len) == 0) replace = kTRUE;
01755                   }
01756                }
01757             }
01758          } else {
01759             TObjString *os = dynamic_cast<TObjString *>(pr->Key());
01760             if (os) {
01761                if (os->GetString().IsNull() ||
01762                    !strncmp(file, os->GetName(), os->GetString().Length())) replace = kTRUE;
01763             }
01764          }
01765          if (replace) {
01766             TObjString *ost = dynamic_cast<TObjString *>(pr->Value());
01767             if (ost) {
01768                file1.Form("%s%s", ost->GetName(), furl->GetFileAndOptions());
01769                replaced = kTRUE;
01770                break;
01771             }
01772          }
01773       }
01774    }
01775    // Done
01776    return replaced;
01777 }
01778 
01779 //_______________________________________________________________________________________
01780 void TDataSetManager::SetScanCounters(Int_t t, Int_t o, Int_t d)
01781 {
01782    // Update scan counters
01783 
01784    fNTouchedFiles = (t > -1) ? t : fNTouchedFiles;
01785    fNOpenedFiles = (o > -1) ? o : fNOpenedFiles;
01786    fNDisappearedFiles = (d > -1) ? d : fNDisappearedFiles;
01787 }

Generated on Tue Jul 5 14:51:35 2011 for ROOT_528-00b_version by  doxygen 1.5.1