00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #include "syslog.h"
00042 #include "assert.h"
00043
00044 #include "THDFSFile.h"
00045 #include "TError.h"
00046 #include "TSystem.h"
00047 #include "TROOT.h"
00048
00049 #include "hdfs.h"
00050
00051
00052
00053 static const Bool_t R__HDFS_ALLOW_CHANGES = kFALSE;
00054
00055
00056
00057 #define THDFSFile_TRACE
00058 #ifndef THDFSFile_TRACE
00059 #define TRACE(x) \
00060 Debug("THDFSFile", "%s", x);
00061 #else
00062 #define TRACE(x);
00063 #endif
00064
00065 ClassImp(THDFSFile)
00066
00067
00068 THDFSFile::THDFSFile(const char *path, Option_t *option,
00069 const char *ftitle, Int_t compress):
00070 TFile(path, "WEB", ftitle, compress)
00071 {
00072
00073
00074 fHdfsFH = 0;
00075 fFS = 0;
00076 fSize = -1;
00077 fPath = 0;
00078 fSysOffset = 0;
00079
00080 fOption = option;
00081 fOption.ToUpper();
00082 Bool_t create = (fOption == "CREATE") ? kTRUE : kFALSE;
00083 Bool_t recreate = (fOption == "RECREATE") ? kTRUE : kFALSE;
00084 Bool_t update = (fOption == "UPDATE") ? kTRUE : kFALSE;
00085 Bool_t read = (fOption == "READ") ? kTRUE : kFALSE;
00086 if (!create && !recreate && !update && !read) {
00087 read = kTRUE;
00088 fOption = "READ";
00089 }
00090
00091 Bool_t has_authn = kTRUE;
00092
00093 if (has_authn) {
00094 UserGroup_t *ugi = gSystem->GetUserInfo(0);
00095 const char *user = (ugi->fUser).Data();
00096 const char * groups[1] = {(ugi->fGroup.Data())};
00097 fFS = hdfsConnectAsUser("default", 0, user, groups, 1);
00098 delete ugi;
00099 } else {
00100 fFS = hdfsConnect("default", 0);
00101 }
00102
00103 if (fFS == 0) {
00104 SysError("THDFSFile", "HDFS client for %s cannot open the filesystem",
00105 path);
00106 goto zombie;
00107 }
00108
00109 if (create || update || recreate) {
00110 Int_t mode = O_RDWR | O_CREAT;
00111 if (recreate) mode |= O_TRUNC;
00112
00113 #ifndef WIN32
00114 fD = SysOpen(path, mode, 0644);
00115 #else
00116 fD = SysOpen(path, mode | O_BINARY, S_IREAD | S_IWRITE);
00117 #endif
00118 if (fD == -1) {
00119 SysError("THDFSFile", "file %s can not be opened", path);
00120 goto zombie;
00121 }
00122 fWritable = kTRUE;
00123 } else {
00124 #ifndef WIN32
00125 fD = SysOpen(path, O_RDONLY, 0644);
00126 #else
00127 fD = SysOpen(path, O_RDONLY | O_BINARY, S_IREAD | S_IWRITE);
00128 #endif
00129 if (fD == -1) {
00130 SysError("THDFSFile", "file %s can not be opened for reading", path);
00131 goto zombie;
00132 }
00133 fWritable = kFALSE;
00134 }
00135
00136 Init(create || recreate);
00137
00138 return;
00139
00140 zombie:
00141
00142 MakeZombie();
00143 gDirectory = gROOT;
00144 }
00145
00146
00147 THDFSFile::~THDFSFile()
00148 {
00149
00150
00151 TRACE("destroy")
00152
00153 if (fPath)
00154 delete [] fPath;
00155
00156
00157
00158
00159
00160
00161 }
00162
00163
00164 Int_t THDFSFile::SysRead(Int_t, void *buf, Int_t len)
00165 {
00166
00167
00168
00169 TRACE("READ")
00170 tSize num_read = hdfsPread(fFS, (hdfsFile)fHdfsFH, fSysOffset, buf, len);
00171 fSysOffset += len;
00172 if (num_read < 0) {
00173 gSystem->SetErrorStr(strerror(errno));
00174 }
00175 return num_read;
00176 }
00177
00178
00179 Long64_t THDFSFile::SysSeek(Int_t, Long64_t offset, Int_t whence)
00180 {
00181
00182
00183
00184 TRACE("SEEK")
00185 if (whence == SEEK_SET)
00186 fSysOffset = offset;
00187 else if (whence == SEEK_CUR)
00188 fSysOffset += offset;
00189 else if (whence == SEEK_END) {
00190 if (offset > 0) {
00191 SysError("THDFSFile", "Unable to seek past end of file");
00192 return -1;
00193 }
00194 if (fSize == -1) {
00195 hdfsFileInfo *info = hdfsGetPathInfo(fFS, fPath);
00196 if (info != 0) {
00197 fSize = info->mSize;
00198 free(info);
00199 } else {
00200 SysError("THDFSFile", "Unable to seek to end of file");
00201 return -1;
00202 }
00203 }
00204 fSysOffset = fSize;
00205 } else {
00206 SysError("THDFSFile", "Unknown whence!");
00207 return -1;
00208 }
00209 return fSysOffset;
00210 }
00211
00212
00213 Int_t THDFSFile::SysOpen(const char * pathname, Int_t flags, UInt_t)
00214 {
00215
00216
00217
00218
00219 TUrl url(pathname);
00220 const char * file = url.GetFile();
00221 size_t path_size = strlen(file);
00222 fPath = new char[path_size+1];
00223 if (fPath == 0) {
00224 SysError("THDFSFile", "Unable to allocate memory for path.");
00225 }
00226 strlcpy(fPath, file,path_size+1);
00227 if ((fHdfsFH = hdfsOpenFile(fFS, fPath, flags, 0, 0, 0)) == 0) {
00228 SysError("THDFSFile", "Unable to open file %s in HDFS", pathname);
00229 return -1;
00230 }
00231 return 1;
00232 }
00233
00234
00235 Int_t THDFSFile::SysClose(Int_t)
00236 {
00237
00238
00239 int result = hdfsCloseFile(fFS, (hdfsFile)fHdfsFH);
00240 fFS = 0;
00241 fHdfsFH = 0;
00242 return result;
00243 }
00244
00245
00246 Int_t THDFSFile::SysWrite(Int_t, const void *, Int_t)
00247 {
00248
00249
00250 errno = ENOSYS;
00251 return -1;
00252 }
00253
00254
00255 Int_t THDFSFile::SysStat(Int_t, Long_t* id, Long64_t* size, Long_t* flags, Long_t* modtime)
00256 {
00257
00258
00259 *id = ::Hash(fPath);
00260
00261 hdfsFileInfo *info = hdfsGetPathInfo(fFS, fPath);
00262 if (info != 0) {
00263 fSize = info->mSize;
00264 *size = fSize;
00265 if (info->mKind == kObjectKindFile)
00266 *flags = 0;
00267 else if (info->mKind == kObjectKindDirectory)
00268 *flags = 1;
00269 *modtime = info->mLastMod;
00270 free(info);
00271 } else {
00272 return 1;
00273 }
00274
00275 return 0;
00276 }
00277
00278
00279 Int_t THDFSFile::SysSync(Int_t)
00280 {
00281
00282
00283 errno = ENOSYS;
00284 return -1;
00285 }
00286
00287
00288 void THDFSFile::ResetErrno() const
00289 {
00290
00291
00292 TSystem::ResetErrno();
00293 }
00294
00295
00296 ClassImp(THDFSSystem)
00297
00298
00299 THDFSSystem::THDFSSystem() : TSystem("-hdfs", "HDFS Helper System")
00300 {
00301
00302 SetName("hdfs");
00303
00304 Bool_t has_authn = kTRUE;
00305
00306 if (has_authn) {
00307 UserGroup_t *ugi = gSystem->GetUserInfo(0);
00308 const char *user = (ugi->fUser).Data();
00309 const char * groups[1] = {(ugi->fGroup.Data())};
00310 fFH = hdfsConnectAsUser("default", 0, user, groups, 1);
00311 delete ugi;
00312 } else {
00313 fFH = hdfsConnect("default", 0);
00314 }
00315
00316 if (fFH == 0) {
00317 SysError("THDFSSystem", "HDFS client cannot open the filesystem");
00318 goto zombie;
00319 }
00320
00321 fDirp = 0;
00322
00323 return;
00324
00325 zombie:
00326
00327 MakeZombie();
00328 gDirectory = gROOT;
00329
00330 }
00331
00332
00333 Int_t THDFSSystem::MakeDirectory(const char * path)
00334 {
00335
00336 if (fFH != 0) {
00337 Error("MakeDirectory", "No filesystem handle (should never happen)");
00338 return -1;
00339 }
00340
00341 if (R__HDFS_ALLOW_CHANGES == kTRUE) {
00342 return hdfsCreateDirectory(fFH, path);
00343 } else {
00344 return -1;
00345 }
00346
00347 }
00348
00349
00350 void *THDFSSystem::OpenDirectory(const char * path)
00351 {
00352
00353
00354
00355 if (fFH == 0) {
00356 Error("OpenDirectory", "No filesystem handle (should never happen)");
00357 return 0;
00358 }
00359
00360 fDirp = 0;
00361
00362
00363
00364
00365
00366
00367
00368 hdfsFileInfo * dir = 0;
00369 if ((dir = hdfsGetPathInfo(fFH, path)) == 0) {
00370 return 0;
00371 }
00372 if (dir->mKind != kObjectKindDirectory) {
00373 return 0;
00374 }
00375
00376 fDirp = (void *)hdfsListDirectory(fFH, path, &fDirEntries);
00377 fDirCtr = 0;
00378
00379 fUrlp = new TUrl[fDirEntries];
00380
00381 return fDirp;
00382 }
00383
00384
00385 void THDFSSystem::FreeDirectory(void *dirp)
00386 {
00387
00388
00389 if (fFH == 0) {
00390 Error("FreeDirectory", "No filesystem handle (should never happen)");
00391 return;
00392 }
00393 if (dirp != fDirp) {
00394 Error("FreeDirectory", "invalid directory pointer (should never happen)");
00395 return;
00396 }
00397 if (fUrlp != 0) {
00398 delete fUrlp;
00399 }
00400
00401 hdfsFreeFileInfo((hdfsFileInfo *)fDirp, fDirEntries);
00402 fDirp=0;
00403 }
00404
00405
00406 const char *THDFSSystem::GetDirEntry(void *dirp)
00407 {
00408
00409
00410 if (fFH == 0) {
00411 Error("GetDirEntry", "No filesystem handle (should never happen)");
00412 return 0;
00413 }
00414 if (dirp != fDirp) {
00415 Error("GetDirEntry", "invalid directory pointer (should never happen)");
00416 return 0;
00417 }
00418 if (dirp == 0) {
00419 Error("GetDirEntry", "Passed an invalid directory pointer.");
00420 return 0;
00421 }
00422
00423 if (fDirCtr == fDirEntries-1) {
00424 return 0;
00425 }
00426
00427 hdfsFileInfo *fileInfo = ((hdfsFileInfo *)dirp) + fDirCtr;
00428 fUrlp[fDirCtr].SetUrl(fileInfo->mName);
00429 const char * result = fUrlp[fDirCtr].GetFile();
00430 TUrl tempUrl;
00431 tempUrl.SetUrl("hdfs:///");
00432 tempUrl.SetFile(result);
00433 fUrlp[fDirCtr].SetUrl(tempUrl.GetUrl());
00434 result = fUrlp[fDirCtr].GetUrl();
00435 fDirCtr++;
00436
00437 return result;
00438 }
00439
00440
00441 Int_t THDFSSystem::GetPathInfo(const char *path, FileStat_t &buf)
00442 {
00443
00444
00445
00446
00447
00448 if (fFH == 0) {
00449 Error("GetPathInfo", "No filesystem handle (should never happen)");
00450 return 1;
00451 }
00452 hdfsFileInfo *fileInfo = hdfsGetPathInfo(fFH, path);
00453
00454 if (fileInfo == 0)
00455 return 1;
00456
00457 buf.fDev = 0;
00458 buf.fIno = 0;
00459 buf.fMode = fileInfo->mPermissions;
00460 buf.fUid = gSystem->GetUid(fileInfo->mOwner);
00461 buf.fGid = gSystem->GetGid(fileInfo->mGroup);
00462 buf.fSize = fileInfo->mSize;
00463 buf.fMtime = fileInfo->mLastAccess;
00464 buf.fIsLink = kFALSE;
00465
00466 return 0;
00467 }
00468
00469
00470 Bool_t THDFSSystem::AccessPathName(const char *path, EAccessMode mode)
00471 {
00472
00473
00474
00475
00476 if (mode & kExecutePermission || mode & kWritePermission)
00477 return kTRUE;
00478
00479 if (fFH == 0) {
00480 Error("AccessPathName", "No filesystem handle (should never happen)");
00481 return kTRUE;
00482 }
00483
00484 if (hdfsExists(fFH, path) == 0)
00485 return kFALSE;
00486 else
00487 return kTRUE;
00488 }
00489
00490
00491 Int_t THDFSSystem::Unlink(const char * path)
00492 {
00493
00494
00495
00496 if (fFH == 0) {
00497 Error("Unlink", "No filesystem handle (should never happen)");
00498 return kTRUE;
00499 }
00500
00501 if (R__HDFS_ALLOW_CHANGES == kTRUE) {
00502 return hdfsDelete(fFH, path);
00503 } else {
00504 return -1;
00505 }
00506 }