THDFSFile.cxx

Go to the documentation of this file.
00001 // @(#)root/hdfs:$Id: THDFSFile.cxx 36308 2010-10-12 07:13:29Z brun $
00002 // Author: Brian Bockelman 29/09/2009
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // THDFSFile                                                            //
00015 //                                                                      //
00016 // A THDFSFile is like a normal TFile except that it reads and writes   //
00017 // its data via the HDFS protocols.  For more information on HDFS, see  //
00018 // http://hadoop.apache.org/hdfs/.                                      //
00019 // This implementation interfaces with libhdfs, which is a JNI-based    //
00020 // library (i.e., it will start a Java JVM internally the first time    //
00021 // it is called).  At a minimum, you will need your environment's       //
00022 // $CLASSPATH variable set up properly to use.  Here's an example of    //
00023 // one way to properly set your classpath, assuming you use the OSG     //
00024 // distribution of Hadoop:                                              //
00025 //    $ source $HADOOP_CONF_DIR/hadoop-env.sh                           //
00026 //    $ export CLASSPATH=$HADOOP_CLASSPATH                              //
00027 // Additionally, you will need a valid libjvm in your $LD_LIBRARY_PATH  //
00028 // This is usually found in either:                                     //
00029 //    $JAVA_HOME/jre/lib/i386/server                                    //
00030 // or                                                                   //
00031 //    $JAVA_HOME/jre/lib/amd64/server                                   //
00032 // This file can only be used if hdfs support is compiled into ROOT.    //
00033 //                                                                      //
00034 // The HDFS URLs should be of the form:                                 //
00035 //    hdfs:///path/to/file/in/HDFS.root                                 //
00036 // Any host or port information will be ignored; this is taken from the //
00037 // node's HDFS configuration files.                                     //
00038 //                                                                      //
00039 //////////////////////////////////////////////////////////////////////////
00040 
00041 #include "syslog.h"
00042 #include "assert.h"
00043 
00044 #include "THDFSFile.h"
00045 #include "TError.h"
00046 #include "TSystem.h"
00047 #include "TROOT.h"
00048 
00049 #include "hdfs.h"
00050 //#include "hdfsJniHelper.h"
00051 
00052 // For now, we don't allow any write/fs modification operations.
00053 static const Bool_t R__HDFS_ALLOW_CHANGES = kFALSE;
00054 
00055 // The following snippet is used for developer-level debugging
00056 // Contributed by Pete Wyckoff of the HDFS project
00057 #define THDFSFile_TRACE
00058 #ifndef THDFSFile_TRACE
00059 #define TRACE(x) \
00060   Debug("THDFSFile", "%s", x);
00061 #else
00062 #define TRACE(x);
00063 #endif
00064 
00065 ClassImp(THDFSFile)
00066 
00067 //______________________________________________________________________________
00068 THDFSFile::THDFSFile(const char *path, Option_t *option,
00069                      const char *ftitle, Int_t compress):
00070    TFile(path, "WEB", ftitle, compress)
00071 {
00072    // Usual Constructor.  See the TFile constructor for details.
00073 
00074    fHdfsFH    = 0;
00075    fFS        = 0;
00076    fSize      = -1;
00077    fPath      = 0;
00078    fSysOffset = 0;
00079 
00080    fOption = option;
00081    fOption.ToUpper();
00082    Bool_t create   = (fOption == "CREATE") ? kTRUE : kFALSE;
00083    Bool_t recreate = (fOption == "RECREATE") ? kTRUE : kFALSE;
00084    Bool_t update   = (fOption == "UPDATE") ? kTRUE : kFALSE;
00085    Bool_t read     = (fOption == "READ") ? kTRUE : kFALSE;
00086    if (!create && !recreate && !update && !read) {
00087       read    = kTRUE;
00088       fOption = "READ";
00089    }
00090 
00091    Bool_t has_authn = kTRUE;
00092 
00093    if (has_authn) {
00094       UserGroup_t *ugi = gSystem->GetUserInfo(0);
00095       const char *user = (ugi->fUser).Data();
00096       const char * groups[1] = {(ugi->fGroup.Data())};
00097       fFS = hdfsConnectAsUser("default", 0, user, groups, 1);
00098       delete ugi;
00099    } else {
00100       fFS = hdfsConnect("default", 0);
00101    }
00102 
00103    if (fFS == 0) {
00104       SysError("THDFSFile", "HDFS client for %s cannot open the filesystem",
00105                path);
00106       goto zombie;
00107    }
00108 
00109    if (create || update || recreate) {
00110       Int_t mode = O_RDWR | O_CREAT;
00111       if (recreate) mode |= O_TRUNC;
00112 
00113 #ifndef WIN32
00114       fD = SysOpen(path, mode, 0644);
00115 #else
00116       fD = SysOpen(path, mode | O_BINARY, S_IREAD | S_IWRITE);
00117 #endif
00118       if (fD == -1) {
00119          SysError("THDFSFile", "file %s can not be opened", path);
00120          goto zombie;
00121       }
00122       fWritable = kTRUE;
00123    } else {
00124 #ifndef WIN32
00125       fD = SysOpen(path, O_RDONLY, 0644);
00126 #else
00127       fD = SysOpen(path, O_RDONLY | O_BINARY, S_IREAD | S_IWRITE);
00128 #endif
00129       if (fD == -1) {
00130          SysError("THDFSFile", "file %s can not be opened for reading", path);
00131          goto zombie;
00132       }
00133       fWritable = kFALSE;
00134    }
00135 
00136    Init(create || recreate);
00137 
00138    return;
00139 
00140 zombie:
00141    // Error in opening file; make this a zombie
00142    MakeZombie();
00143    gDirectory = gROOT;
00144 }
00145 
00146 //______________________________________________________________________________
00147 THDFSFile::~THDFSFile()
00148 {
00149    // Close and clean-up HDFS file.
00150 
00151    TRACE("destroy")
00152 
00153    if (fPath)
00154       delete [] fPath;
00155 
00156    // We assume that the file is closed in SysClose
00157    // Explicitly release reference to HDFS filesystem object.
00158    // Turned off now due to compilation issues.
00159    // The very awkward way of releasing HDFS FS objects (by accessing JNI
00160    // internals) is going away in the next libhdfs version.
00161 }
00162 
00163 //______________________________________________________________________________
00164 Int_t THDFSFile::SysRead(Int_t, void *buf, Int_t len)
00165 {
00166    // Read specified number of bytes from current offset into the buffer.
00167    // See documentation for TFile::SysRead().
00168 
00169    TRACE("READ")
00170    tSize num_read = hdfsPread(fFS, (hdfsFile)fHdfsFH, fSysOffset, buf, len);
00171    fSysOffset += len;
00172    if (num_read < 0) {
00173       gSystem->SetErrorStr(strerror(errno));
00174    }
00175    return num_read;
00176 }
00177 
00178 //______________________________________________________________________________
00179 Long64_t THDFSFile::SysSeek(Int_t, Long64_t offset, Int_t whence)
00180 {
00181    // Seek to a specified position in the file.  See TFile::SysSeek().
00182    // Note that THDFSFile does not support seeks when the file is open for write.
00183 
00184    TRACE("SEEK")
00185    if (whence == SEEK_SET)
00186       fSysOffset = offset;
00187    else if (whence == SEEK_CUR)
00188       fSysOffset += offset;
00189    else if (whence == SEEK_END) {
00190       if (offset > 0) {
00191          SysError("THDFSFile", "Unable to seek past end of file");
00192          return -1;
00193       }
00194       if (fSize == -1) {
00195          hdfsFileInfo *info = hdfsGetPathInfo(fFS, fPath);
00196          if (info != 0) {
00197             fSize = info->mSize;
00198             free(info);
00199          } else {
00200             SysError("THDFSFile", "Unable to seek to end of file");
00201             return -1;
00202          }
00203       }
00204       fSysOffset = fSize;
00205    } else {
00206       SysError("THDFSFile", "Unknown whence!");
00207       return -1;
00208    }
00209    return fSysOffset;
00210 }
00211 
00212 //______________________________________________________________________________
00213 Int_t THDFSFile::SysOpen(const char * pathname, Int_t flags, UInt_t)
00214 {
00215    // Open a file in HDFS.
00216 
00217    // This is given to us as a URL (hdfs://hadoop-name:9000//foo or
00218    // hdfs:///foo); convert this to a file name.
00219    TUrl url(pathname);
00220    const char * file = url.GetFile();
00221    size_t path_size = strlen(file);
00222    fPath = new char[path_size+1];
00223    if (fPath == 0) {
00224       SysError("THDFSFile", "Unable to allocate memory for path.");
00225    }
00226    strlcpy(fPath, file,path_size+1);
00227    if ((fHdfsFH = hdfsOpenFile(fFS, fPath, flags, 0, 0, 0)) == 0) {
00228       SysError("THDFSFile", "Unable to open file %s in HDFS", pathname);
00229       return -1;
00230    }
00231    return 1;
00232 }
00233 
00234 //______________________________________________________________________________
00235 Int_t THDFSFile::SysClose(Int_t)
00236 {
00237    // Close the file in HDFS.
00238 
00239    int result = hdfsCloseFile(fFS, (hdfsFile)fHdfsFH);
00240    fFS = 0;
00241    fHdfsFH = 0;
00242    return result;
00243 }
00244 
00245 //______________________________________________________________________________
00246 Int_t THDFSFile::SysWrite(Int_t, const void *, Int_t)
00247 {
00248    // Write a buffer into the file; this is not supported currently.
00249 
00250    errno = ENOSYS;
00251    return -1;
00252 }
00253 
00254 //______________________________________________________________________________
00255 Int_t THDFSFile::SysStat(Int_t, Long_t* id, Long64_t* size, Long_t* flags, Long_t* modtime)
00256 {
00257    // Perform a stat on the HDFS file; see TFile::SysStat().
00258 
00259    *id = ::Hash(fPath);
00260 
00261    hdfsFileInfo *info = hdfsGetPathInfo(fFS, fPath);
00262    if (info != 0) {
00263       fSize = info->mSize;
00264       *size = fSize;
00265       if (info->mKind == kObjectKindFile)
00266          *flags = 0;
00267       else if (info->mKind == kObjectKindDirectory)
00268          *flags = 1;
00269       *modtime = info->mLastMod;
00270       free(info);
00271    } else {
00272       return 1;
00273    }
00274 
00275    return 0;
00276 }
00277 
00278 //______________________________________________________________________________
00279 Int_t THDFSFile::SysSync(Int_t)
00280 {
00281    // Sync remaining data to disk; Not supported by HDFS.
00282 
00283    errno = ENOSYS;
00284    return -1;
00285 }
00286 
00287 //______________________________________________________________________________
00288 void THDFSFile::ResetErrno() const
00289 {
00290    // ResetErrno; simply calls TSystem::ResetErrno().
00291 
00292    TSystem::ResetErrno();
00293 }
00294 
00295 
00296 ClassImp(THDFSSystem)
00297 
00298 //______________________________________________________________________________
00299 THDFSSystem::THDFSSystem() : TSystem("-hdfs", "HDFS Helper System")
00300 {
00301 
00302    SetName("hdfs");
00303 
00304    Bool_t has_authn = kTRUE;
00305 
00306    if (has_authn) {
00307       UserGroup_t *ugi = gSystem->GetUserInfo(0);
00308       const char *user = (ugi->fUser).Data();
00309       const char * groups[1] = {(ugi->fGroup.Data())};
00310       fFH = hdfsConnectAsUser("default", 0, user, groups, 1);
00311       delete ugi;
00312    } else {
00313       fFH = hdfsConnect("default", 0);
00314    }
00315 
00316    if (fFH == 0) {
00317       SysError("THDFSSystem", "HDFS client cannot open the filesystem");
00318       goto zombie;
00319    }
00320 
00321    fDirp = 0;
00322 
00323    return;
00324 
00325 zombie:
00326    // Error in opening file; make this a zombie
00327    MakeZombie();
00328    gDirectory = gROOT;
00329 
00330 }
00331 
00332 //______________________________________________________________________________
00333 Int_t THDFSSystem::MakeDirectory(const char * path)
00334 {
00335    // Make a directory.
00336    if (fFH != 0) {
00337       Error("MakeDirectory", "No filesystem handle (should never happen)");
00338       return -1;
00339    }
00340 
00341    if (R__HDFS_ALLOW_CHANGES == kTRUE) {
00342       return hdfsCreateDirectory(fFH, path);
00343    } else {
00344       return -1;
00345    }
00346 
00347 }
00348 
00349 //______________________________________________________________________________
00350 void *THDFSSystem::OpenDirectory(const char * path)
00351 {
00352    // Open a directory via hdfs. Returns an opaque pointer to a dir
00353    // structure. Returns 0 in case of error.
00354 
00355    if (fFH == 0) {
00356        Error("OpenDirectory", "No filesystem handle (should never happen)");
00357        return 0;
00358    }
00359 
00360    fDirp = 0;
00361 /*
00362    if (fDirp) {
00363       Error("OpenDirectory", "invalid directory pointer (should never happen)");
00364       fDirp = 0;
00365    }
00366 */
00367 
00368    hdfsFileInfo * dir = 0;
00369    if ((dir = hdfsGetPathInfo(fFH, path)) == 0) {
00370       return 0;
00371    }
00372    if (dir->mKind != kObjectKindDirectory) {
00373       return 0;
00374    }
00375 
00376    fDirp = (void *)hdfsListDirectory(fFH, path, &fDirEntries);
00377    fDirCtr = 0;
00378 
00379    fUrlp = new TUrl[fDirEntries];
00380 
00381    return fDirp;
00382 }
00383 
00384 //______________________________________________________________________________
00385 void THDFSSystem::FreeDirectory(void *dirp)
00386 {
00387    // Free directory via httpd.
00388 
00389    if (fFH == 0) {
00390       Error("FreeDirectory", "No filesystem handle (should never happen)");
00391       return;
00392    }
00393    if (dirp != fDirp) {
00394       Error("FreeDirectory", "invalid directory pointer (should never happen)");
00395       return;
00396    }
00397    if (fUrlp != 0) {
00398       delete fUrlp;
00399    }
00400 
00401    hdfsFreeFileInfo((hdfsFileInfo *)fDirp, fDirEntries);
00402    fDirp=0;
00403 }
00404 
00405 //______________________________________________________________________________
00406 const char *THDFSSystem::GetDirEntry(void *dirp)
00407 {
00408    // Get directory entry via httpd. Returns 0 in case no more entries.
00409 
00410    if (fFH == 0) {
00411       Error("GetDirEntry", "No filesystem handle (should never happen)");
00412       return 0;
00413    }
00414    if (dirp != fDirp) {
00415       Error("GetDirEntry", "invalid directory pointer (should never happen)");
00416       return 0;
00417    }
00418    if (dirp == 0) {
00419       Error("GetDirEntry", "Passed an invalid directory pointer.");
00420       return 0;
00421    }
00422 
00423    if (fDirCtr == fDirEntries-1) {
00424       return 0;
00425    }
00426 
00427    hdfsFileInfo *fileInfo = ((hdfsFileInfo *)dirp) + fDirCtr;
00428    fUrlp[fDirCtr].SetUrl(fileInfo->mName);
00429    const char * result = fUrlp[fDirCtr].GetFile();
00430    TUrl tempUrl;
00431    tempUrl.SetUrl("hdfs:///");
00432    tempUrl.SetFile(result);
00433    fUrlp[fDirCtr].SetUrl(tempUrl.GetUrl());
00434    result = fUrlp[fDirCtr].GetUrl();
00435    fDirCtr++;
00436 
00437    return result;
00438 }
00439 
00440 //______________________________________________________________________________
00441 Int_t THDFSSystem::GetPathInfo(const char *path, FileStat_t &buf)
00442 {
00443    // Get info about a file. Info is returned in the form of a FileStat_t
00444    // structure (see TSystem.h).
00445    // The function returns 0 in case of success and 1 if the file could
00446    // not be stat'ed.
00447 
00448    if (fFH == 0) {
00449       Error("GetPathInfo", "No filesystem handle (should never happen)");
00450       return 1;
00451    }
00452    hdfsFileInfo *fileInfo = hdfsGetPathInfo(fFH, path);
00453 
00454    if (fileInfo == 0)
00455       return 1;
00456 
00457    buf.fDev    = 0;
00458    buf.fIno    = 0;
00459    buf.fMode   = fileInfo->mPermissions;
00460    buf.fUid    = gSystem->GetUid(fileInfo->mOwner);
00461    buf.fGid    = gSystem->GetGid(fileInfo->mGroup);
00462    buf.fSize   = fileInfo->mSize;
00463    buf.fMtime  = fileInfo->mLastAccess;
00464    buf.fIsLink = kFALSE;
00465 
00466    return 0;
00467 }
00468 
00469 //______________________________________________________________________________
00470 Bool_t THDFSSystem::AccessPathName(const char *path, EAccessMode mode)
00471 {
00472    // Returns FALSE if one can access a file using the specified access mode.
00473    // Mode is the same as for the Unix access(2) function.
00474    // Attention, bizarre convention of return value!!
00475 
00476    if (mode & kExecutePermission || mode & kWritePermission)
00477        return kTRUE;
00478 
00479    if (fFH == 0) {
00480       Error("AccessPathName", "No filesystem handle (should never happen)");
00481       return kTRUE;
00482    }
00483 
00484    if (hdfsExists(fFH, path) == 0)
00485       return kFALSE;
00486    else
00487       return kTRUE;
00488 }
00489 
00490 //______________________________________________________________________________
00491 Int_t THDFSSystem::Unlink(const char * path)
00492 {
00493    // Unlink, i.e. remove, a file or directory. Returns 0 when succesfull,
00494    // -1 in case of failure.
00495 
00496    if (fFH == 0) {
00497       Error("Unlink", "No filesystem handle (should never happen)");
00498       return kTRUE;
00499    }
00500 
00501    if (R__HDFS_ALLOW_CHANGES == kTRUE) {
00502       return hdfsDelete(fFH, path);
00503    } else {
00504       return -1;
00505    }
00506 }

Generated on Tue Jul 5 14:28:35 2011 for ROOT_528-00b_version by  doxygen 1.5.1