00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 const char *XrdClientCVSID = "$Id: XrdClient.cc 38011 2011-02-08 18:35:57Z ganis $";
00017
00018 #include "XrdClient/XrdClient.hh"
00019 #include "XrdClient/XrdClientDebug.hh"
00020 #include "XrdClient/XrdClientUrlSet.hh"
00021 #include "XrdClient/XrdClientConn.hh"
00022 #include "XrdClient/XrdClientEnv.hh"
00023 #include "XrdClient/XrdClientConnMgr.hh"
00024 #include "XrdClient/XrdClientSid.hh"
00025 #include "XrdClient/XrdClientMStream.hh"
00026 #include "XrdClient/XrdClientReadV.hh"
00027 #include "XrdOuc/XrdOucCRC.hh"
00028 #include "XrdClient/XrdClientReadAhead.hh"
00029 #include "XrdClient/XrdClientCallback.hh"
00030
00031 #include <stdio.h>
00032 #ifndef WIN32
00033 #include <unistd.h>
00034 #endif
00035
00036 #include <sys/types.h>
00037 #include <sys/stat.h>
00038 #include <fcntl.h>
00039 #include <signal.h>
00040
00041
00042 XrdSysSemWait XrdClient::fConcOpenSem(DFLT_MAXCONCURRENTOPENS);
00043
00044
00045
00046
00047 void *FileOpenerThread(void *arg, XrdClientThread *thr) {
00048
00049 XrdClient *thisObj = (XrdClient *)arg;
00050
00051 thr->SetCancelDeferred();
00052 thr->SetCancelOn();
00053
00054
00055 bool res = thisObj->TryOpen(thisObj->fOpenPars.mode, thisObj->fOpenPars.options, false);
00056 if (thisObj->fXrdCcb) thisObj->fXrdCcb->OpenComplete(thisObj, thisObj->fXrdCcbArg, res);
00057
00058 return 0;
00059 }
00060
00061
00062
00063 XrdClient::XrdClient(const char *url,
00064 XrdClientCallback *XrdCcb,
00065 void *XrdCcbArg) : XrdClientAbs(XrdCcb, XrdCcbArg) {
00066
00067 fReadAheadMgr = 0;
00068 fReadTrimBlockSize = 0;
00069 fOpenerTh = 0;
00070 fOpenProgCnd = new XrdSysCondVar(0);
00071 fReadWaitData = new XrdSysCondVar(0);
00072
00073 memset(&fStatInfo, 0, sizeof(fStatInfo));
00074 memset(&fOpenPars, 0, sizeof(fOpenPars));
00075 memset(&fCounters, 0, sizeof(fCounters));
00076
00077
00078 DebugSetLevel(EnvGetLong(NAME_DEBUG));
00079
00080 if (!ConnectionManager)
00081 Info(XrdClientDebug::kUSERDEBUG,
00082 "Create",
00083 "(C) 2004-2010 by the Xrootd group. XrdClient $Revision$ - Xrootd version: " << XrdVSTRING);
00084
00085 #ifndef WIN32
00086 signal(SIGPIPE, SIG_IGN);
00087 #endif
00088
00089 fInitialUrl = url;
00090
00091 fConnModule = new XrdClientConn();
00092
00093
00094 if (!fConnModule) {
00095 Error("Create","Object creation failed.");
00096 abort();
00097 }
00098
00099 fConnModule->SetRedirHandler(this);
00100
00101 int CacheSize = EnvGetLong(NAME_READCACHESIZE);
00102 int RaSize = EnvGetLong(NAME_READAHEADSIZE);
00103 int RmPolicy = EnvGetLong(NAME_READCACHEBLKREMPOLICY);
00104 int ReadAheadStrategy = EnvGetLong(NAME_READAHEADSTRATEGY);
00105
00106 SetReadAheadStrategy(ReadAheadStrategy);
00107 SetBlockReadTrimming(EnvGetLong(NAME_READTRIMBLKSZ));
00108
00109 fUseCache = (CacheSize > 0);
00110 SetCacheParameters(CacheSize, RaSize, RmPolicy);
00111 }
00112
00113
00114 XrdClient::~XrdClient()
00115 {
00116
00117 if (IsOpen_wait()) Close();
00118
00119
00120 fOpenProgCnd->Lock();
00121
00122 if (fOpenerTh) {
00123 fOpenerTh->Cancel();
00124 fOpenerTh->Join();
00125 delete fOpenerTh;
00126 fOpenerTh = 0;
00127 }
00128
00129 fOpenProgCnd->UnLock();
00130
00131 if (fConnModule)
00132 delete fConnModule;
00133
00134 if (fReadAheadMgr) delete fReadAheadMgr;
00135 fReadAheadMgr = 0;
00136
00137 delete fReadWaitData;
00138 delete fOpenProgCnd;
00139
00140 PrintCounters();
00141 }
00142
00143
00144 bool XrdClient::IsOpen_inprogress()
00145 {
00146
00147 bool res;
00148
00149 if (!fOpenProgCnd) return false;
00150
00151 fOpenProgCnd->Lock();
00152 res = fOpenPars.inprogress;
00153 fOpenProgCnd->UnLock();
00154
00155 return res;
00156 };
00157
00158
00159 bool XrdClient::IsOpen_wait() {
00160 bool res;
00161
00162 if (!fOpenProgCnd) return false;
00163
00164 fOpenProgCnd->Lock();
00165
00166 if (fOpenPars.inprogress) {
00167 fOpenProgCnd->Wait();
00168
00169 if (fOpenerTh) {
00170
00171
00172 fOpenProgCnd->UnLock();
00173
00174 fOpenerTh->Join();
00175 delete fOpenerTh;
00176 fOpenerTh = 0;
00177
00178
00179 fOpenProgCnd->Lock();
00180 }
00181 }
00182 res = fOpenPars.opened;
00183 fOpenProgCnd->UnLock();
00184
00185 return res;
00186 };
00187
00188
00189 void XrdClient::TerminateOpenAttempt() {
00190 fOpenProgCnd->Lock();
00191
00192 fOpenPars.inprogress = false;
00193 fOpenProgCnd->Broadcast();
00194 fOpenProgCnd->UnLock();
00195
00196 fConcOpenSem.Post();
00197
00198
00199 }
00200
00201
00202 bool XrdClient::Open(kXR_unt16 mode, kXR_unt16 options, bool doitparallel) {
00203 short locallogid;
00204
00205
00206 fOpenPars.opened = FALSE;
00207 fOpenPars.options = options;
00208 fOpenPars.mode = mode;
00209
00210
00211
00212
00213
00214
00215 int connectMaxTry = EnvGetLong(NAME_FIRSTCONNECTMAXCNT);
00216
00217 fConnModule->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT));
00218
00219
00220 XrdClientUrlSet urlArray(fInitialUrl);
00221 if (!urlArray.IsValid()) {
00222 Error("Open", "The URL provided is incorrect.");
00223 return FALSE;
00224 }
00225
00226 XrdClientUrlInfo unfo(fInitialUrl);
00227 if (unfo.File == "") {
00228 Error("Open", "The URL provided is incorrect.");
00229 return FALSE;
00230 }
00231
00232
00233
00234
00235
00236 urlArray.Rewind();
00237 locallogid = -1;
00238 int urlstried = 0;
00239 for (int connectTry = 0;
00240 (connectTry < connectMaxTry) && (!fConnModule->IsConnected());
00241 connectTry++) {
00242
00243 XrdClientUrlInfo *thisUrl = 0;
00244 urlstried = (urlstried == urlArray.Size()) ? 0 : urlstried;
00245
00246 if ( fConnModule->IsOpTimeLimitElapsed(time(0)) ) {
00247
00248 fConnModule->Disconnect(TRUE);
00249 Error("Open", "Access to server failed: Too much time elapsed without success.");
00250 break;
00251 }
00252
00253 bool nogoodurl = TRUE;
00254 while (urlArray.Size() > 0) {
00255
00256 unsigned int seed = XrdOucCRC::CRC32((const unsigned char*)unfo.File.c_str(), unfo.File.length());
00257
00258
00259 if ((thisUrl = urlArray.GetARandomUrl(seed))) {
00260
00261 if (fConnModule->CheckHostDomain(thisUrl->Host)) {
00262 nogoodurl = FALSE;
00263
00264 Info(XrdClientDebug::kHIDEBUG, "Open", "Trying to connect to " <<
00265 thisUrl->Host << ":" << thisUrl->Port << ". Connect try " <<
00266 connectTry+1);
00267 locallogid = fConnModule->Connect(*thisUrl, this);
00268
00269 urlstried++;
00270 break;
00271 } else {
00272
00273 urlArray.EraseUrl(thisUrl);
00274 continue;
00275 }
00276 }
00277 }
00278 if (nogoodurl) {
00279 Error("Open", "Access denied to all URL domains requested");
00280 break;
00281 }
00282
00283
00284 if (fConnModule->IsConnected()) {
00285
00286
00287
00288
00289 Info(XrdClientDebug::kHIDEBUG, "Open",
00290 "The logical connection id is " << fConnModule->GetLogConnID() <<
00291 ".");
00292
00293 fConnModule->SetUrl(*thisUrl);
00294 fUrl = *thisUrl;
00295
00296 Info(XrdClientDebug::kHIDEBUG, "Open", "Working url is " << thisUrl->GetUrl());
00297
00298
00299 if (!fConnModule->GetAccessToSrv()) {
00300
00301 if (fConnModule->GetRedirCnt() >= fConnModule->GetMaxRedirCnt()) {
00302
00303
00304 fConnModule->Disconnect(TRUE);
00305 Error("Open", "Access to server failed: Max redirections exceeded. This means typically 'too many errors'.");
00306 break;
00307 }
00308
00309 if (fConnModule->LastServerError.errnum == kXR_NotAuthorized) {
00310 if (urlstried == urlArray.Size()) {
00311
00312
00313 fConnModule->Disconnect(TRUE);
00314 XrdOucString msg(fConnModule->LastServerError.errmsg);
00315 msg.erasefromend(1);
00316 Error("Open", "Authentication failure: " << msg);
00317 connectTry = connectMaxTry;
00318 } else {
00319 XrdOucString msg(fConnModule->LastServerError.errmsg);
00320 msg.erasefromend(1);
00321 Info(XrdClientDebug::kHIDEBUG, "Open",
00322 "Authentication failure: " << msg);
00323 }
00324 } else {
00325 fConnModule->Disconnect(TRUE);
00326 Error("Open", "Access to server failed: error: " <<
00327 fConnModule->LastServerError.errnum << " (" <<
00328 fConnModule->LastServerError.errmsg << ") - retrying.");
00329 }
00330 }
00331 else {
00332 Info(XrdClientDebug::kUSERDEBUG, "Open", "Access to server granted.");
00333 break;
00334 }
00335 }
00336
00337
00338 Info(XrdClientDebug::kHIDEBUG, "Open", "Disconnecting.");
00339
00340 fConnModule->Disconnect(FALSE);
00341
00342 if (connectTry < connectMaxTry-1) {
00343
00344 if (DebugLevel() >= XrdClientDebug::kUSERDEBUG)
00345 Info(XrdClientDebug::kUSERDEBUG, "Open",
00346 "Connection attempt failed. Sleeping " <<
00347 EnvGetLong(NAME_RECONNECTWAIT) << " seconds.");
00348
00349 sleep(EnvGetLong(NAME_RECONNECTWAIT));
00350
00351 }
00352
00353 }
00354
00355 if (!fConnModule->IsConnected()) {
00356 return FALSE;
00357 }
00358
00359
00360
00361
00362
00363
00364 if ((fConnModule->GetServerType() != kSTRootd) &&
00365 (fConnModule->GetServerType() != kSTNone)) {
00366
00367
00368
00369
00370 Info(XrdClientDebug::kUSERDEBUG,
00371 "Open", "Opening the remote file " << fUrl.File);
00372
00373 if (!TryOpen(mode, options, doitparallel)) {
00374 Error("Open", "Error opening the file " <<
00375 fUrl.File << " on host " << fUrl.Host << ":" <<
00376 fUrl.Port);
00377
00378 if (fXrdCcb && !doitparallel)
00379 fXrdCcb->OpenComplete(this, fXrdCcbArg, false);
00380
00381 return FALSE;
00382
00383 } else {
00384
00385 if (doitparallel) {
00386 Info(XrdClientDebug::kUSERDEBUG, "Open", "File open in progress.");
00387 }
00388 else {
00389 Info(XrdClientDebug::kUSERDEBUG, "Open", "File opened succesfully.");
00390 if (fXrdCcb)
00391 fXrdCcb->OpenComplete(this, fXrdCcbArg, true);
00392 }
00393
00394 }
00395
00396 } else {
00397
00398 if (fConnModule->GetServerType() == kSTRootd) {
00399 return FALSE;
00400 }
00401 if (fConnModule->GetServerType() == kSTNone) {
00402 return FALSE;
00403 }
00404 }
00405
00406
00407 return TRUE;
00408
00409 }
00410
00411
00412 int XrdClient::Read(void *buf, long long offset, int len) {
00413 XrdClientIntvList cacheholes;
00414 long blkstowait;
00415 char *tmpbuf = (char *)buf;
00416
00417 Info( XrdClientDebug::kHIDEBUG, "Read",
00418 "Read(offs=" << offset <<
00419 ", len=" << len << ")" );
00420
00421 if (!IsOpen_wait()) {
00422 Error("Read", "File not opened.");
00423 return 0;
00424 }
00425
00426
00427 fConnModule->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT));
00428
00429 fCounters.ReadRequests++;
00430
00431 int cachesize = 0;
00432 long long cachebytessubmitted = 0;
00433 long long cachebyteshit = 0;
00434 long long cachemisscount = 0;
00435 float cachemissrate = 0.0;
00436 long long cachereadreqcnt = 0;
00437 float cachebytesusefulness = 0.0;
00438 bool cachegood = fConnModule->GetCacheInfo(cachesize, cachebytessubmitted,
00439 cachebyteshit, cachemisscount,
00440 cachemissrate, cachereadreqcnt,
00441 cachebytesusefulness);
00442
00443
00444
00445
00446
00447 if (!fUseCache || !cachegood ||
00448 (cachesize < len) ||
00449 (fConnModule->GetServerProtocol() < 0x00000270) ) {
00450
00451
00452
00453 ClientRequest readFileRequest;
00454 memset( &readFileRequest, 0, sizeof(readFileRequest) );
00455 fConnModule->SetSID(readFileRequest.header.streamid);
00456 readFileRequest.read.requestid = kXR_read;
00457 memcpy( readFileRequest.read.fhandle, fHandle, sizeof(fHandle) );
00458 readFileRequest.read.offset = offset;
00459 readFileRequest.read.rlen = len;
00460 readFileRequest.read.dlen = 0;
00461
00462 if (!fConnModule->SendGenCommand(&readFileRequest, 0, 0, (void *)buf,
00463 FALSE, (char *)"ReadBuffer")) return 0;
00464
00465 fCounters.ReadBytes += fConnModule->LastServerResp.dlen;
00466 return fConnModule->LastServerResp.dlen;
00467 }
00468
00469
00470
00471
00472
00473 long long araoffset;
00474 long aralen;
00475 if (fReadAheadMgr && fUseCache &&
00476 !fReadAheadMgr->GetReadAheadHint(offset, len, araoffset, aralen, fReadTrimBlockSize) &&
00477 fConnModule->CacheWillFit(aralen)) {
00478
00479 long long o = araoffset;
00480 long l = aralen;
00481
00482 while (l > 0) {
00483 long ll = xrdmin(4*1024*1024, l);
00484 Read_Async(o, ll, true);
00485 l -= ll;
00486 o += ll;
00487 }
00488
00489 }
00490
00491 struct XrdClientStatInfo stinfo;
00492 Stat(&stinfo);
00493 len = xrdmax(0, xrdmin(len, stinfo.size - offset));
00494
00495 bool retrysync = false;
00496 long totbytes = 0;
00497 bool cachehit = true;
00498
00499
00500 do {
00501 fReadWaitData->Lock();
00502
00503 cacheholes.Clear();
00504 blkstowait = 0;
00505 long bytesgot = 0;
00506
00507
00508
00509
00510
00511 if (!retrysync) {
00512
00513
00514
00515
00516
00517 bytesgot = fConnModule->GetDataFromCache(tmpbuf+totbytes, offset + totbytes,
00518 len + offset - 1,
00519 true,
00520 cacheholes, blkstowait);
00521
00522 totbytes += bytesgot;
00523
00524 Info(XrdClientDebug::kHIDEBUG, "Read",
00525 "Cache response: got " << bytesgot << "@" << offset + totbytes << " bytes. Holes= " <<
00526 cacheholes.GetSize() << " Outstanding= " << blkstowait);
00527
00528
00529
00530 if( bytesgot >= len ) {
00531
00532
00533
00534 Info(XrdClientDebug::kHIDEBUG, "Read",
00535 "Found data in cache. len=" << len <<
00536 " offset=" << offset);
00537
00538 fReadWaitData->UnLock();
00539
00540 if (cachehit) fCounters.ReadHits++;
00541 fCounters.ReadBytes += len;
00542 return len;
00543 }
00544
00545
00546
00547
00548 for (int i = 0; i < cacheholes.GetSize(); i++) {
00549 long long o;
00550 long l;
00551
00552 o = cacheholes[i].beginoffs;
00553 l = cacheholes[i].endoffs - o + 1;
00554
00555
00556 Info( XrdClientDebug::kUSERDEBUG, "Read",
00557 "Hole in the cache: offs=" << o <<
00558 ", len=" << l );
00559
00560
00561 XrdClientReadAheadMgr::TrimReadRequest(o, l, 0, fReadTrimBlockSize);
00562
00563 Read_Async(o, l, false);
00564
00565 cachehit = false;
00566 }
00567
00568
00569 }
00570
00571
00572
00573
00574
00575
00576
00577 if (retrysync || (!bytesgot && !blkstowait && !cacheholes.GetSize())) {
00578
00579 cachehit = false;
00580
00581 fReadWaitData->UnLock();
00582
00583 memset(&fConnModule->LastServerError, 0, sizeof(fConnModule->LastServerError));
00584 fConnModule->LastServerError.errnum = kXR_noErrorYet;
00585
00586 Info( XrdClientDebug::kHIDEBUG, "Read",
00587 "Read(offs=" << offset <<
00588 ", len=" << len << "). Going sync." );
00589
00590 if ((fReadTrimBlockSize > 0) && !retrysync) {
00591 long long offs = offset;
00592 long l = len;
00593
00594 XrdClientReadAheadMgr::TrimReadRequest(offs, l, 0, fReadTrimBlockSize);
00595 Read_Async(offs, l, false);
00596 blkstowait++;
00597 } else {
00598
00599
00600 ClientRequest readFileRequest;
00601 memset( &readFileRequest, 0, sizeof(readFileRequest) );
00602 fConnModule->SetSID(readFileRequest.header.streamid);
00603 readFileRequest.read.requestid = kXR_read;
00604 memcpy( readFileRequest.read.fhandle, fHandle, sizeof(fHandle) );
00605 readFileRequest.read.offset = offset;
00606 readFileRequest.read.rlen = len;
00607 readFileRequest.read.dlen = 0;
00608
00609 if (!fConnModule->SendGenCommand(&readFileRequest, 0, 0, (void *)buf,
00610 FALSE, (char *)"ReadBuffer"))
00611 return 0;
00612
00613 fCounters.ReadBytes += len;
00614 return len;
00615 }
00616
00617 retrysync = false;
00618 }
00619
00620
00621
00622 if ( (blkstowait > 0) || cacheholes.GetSize() ) {
00623 Info( XrdClientDebug::kHIDEBUG, "Read",
00624 "Waiting " << blkstowait+cacheholes.GetSize() << "outstanding blocks." );
00625
00626 if (!fConnModule->IsPhyConnConnected() ||
00627 fReadWaitData->Wait( EnvGetLong(NAME_REQUESTTIMEOUT) ) ||
00628 (fConnModule->LastServerError.errnum != kXR_noErrorYet) ) {
00629
00630 fConnModule->LastServerError.errnum = kXR_noErrorYet;
00631
00632 if (DebugLevel() >= XrdClientDebug::kUSERDEBUG) {
00633 fConnModule->PrintCache();
00634
00635 Error( "Read",
00636 "Timeout or error waiting outstanding blocks. "
00637 "Retrying sync! "
00638 "List of outstanding reqs follows." );
00639 ConnectionManager->SidManager()->PrintoutOutstandingRequests();
00640 }
00641
00642 retrysync = true;
00643 }
00644
00645
00646
00647 }
00648
00649 fReadWaitData->UnLock();
00650
00651 } while ((blkstowait > 0) || cacheholes.GetSize());
00652
00653
00654 if (EnvGetLong(NAME_REMUSEDCACHEBLKS)) {
00655 Info(XrdClientDebug::kHIDEBUG, "Read",
00656 "Removing used blocks " << 0 << "->" << offset );
00657 fConnModule->RemoveDataFromCache(0, offset);
00658 }
00659
00660
00661 if (cachehit) fCounters.ReadHits++;
00662 fCounters.ReadBytes += len;
00663 return len;
00664 }
00665
00666
00667 kXR_int64 XrdClient::ReadV(char *buf, kXR_int64 *offsets, int *lens, int nbuf)
00668 {
00669
00670
00671 if (!nbuf) return 0;
00672
00673 if (!IsOpen_wait()) {
00674 Error("ReadV", "File not opened.");
00675 return 0;
00676 }
00677
00678
00679 if ( fConnModule->GetServerProtocol() < 0 ) {
00680 Info(XrdClientDebug::kHIDEBUG, "ReadV",
00681 "Problems retrieving protocol version run by the server" );
00682 return -1;
00683 }
00684
00685
00686 if ( fConnModule->GetServerProtocol() < 0x00000247 ) {
00687 Info(XrdClientDebug::kHIDEBUG, "ReadV",
00688 "The server is an old version " << fConnModule->GetServerProtocol() <<
00689 " and doesn't support vectored reading" );
00690 return -1;
00691 }
00692
00693 Stat(0);
00694
00695
00696
00697 fConnModule->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT));
00698
00699
00700
00701 XrdClientVector<XrdClientReadVinfo> reqvect(nbuf);
00702
00703
00704 kXR_int64 maxbytes = 0;
00705 for (int ii = 0; ii < nbuf; ii++)
00706 maxbytes += lens[ii];
00707
00708
00709 int spltsize = 0;
00710 int reqsperstream = 0;
00711 XrdClientMStream::GetGoodSplitParameters(fConnModule, spltsize, reqsperstream, maxbytes);
00712
00713
00714
00715
00716
00717
00718 for (int ii = 0; ii < nbuf; ii++)
00719 XrdClientReadV::PreProcessChunkRequest(reqvect, offsets[ii], lens[ii],
00720 fStatInfo.size,
00721 spltsize );
00722
00723
00724 int i = 0, startitem = 0;
00725 kXR_int64 res = 0, bytesread = 0;
00726
00727 if (buf)
00728 fCounters.ReadVRequests++;
00729 else
00730 fCounters.ReadVAsyncRequests++;
00731
00732
00733 while ( i < reqvect.GetSize() ) {
00734
00735
00736
00737
00738
00739 kXR_int64 tmpbytes = 0;
00740
00741 int maxchunkcnt = READV_MAXCHUNKS;
00742 if (EnvGetLong(NAME_MULTISTREAMCNT) > 0)
00743 maxchunkcnt = reqvect.GetSize() / EnvGetLong(NAME_MULTISTREAMCNT)+1;
00744
00745 if (maxchunkcnt < 2) maxchunkcnt = 2;
00746 if (maxchunkcnt > READV_MAXCHUNKS) maxchunkcnt = READV_MAXCHUNKS;
00747
00748 int chunkcnt = 0;
00749 while ( i < reqvect.GetSize() ) {
00750 if (chunkcnt >= maxchunkcnt) break;
00751 if (tmpbytes + reqvect[i].len > spltsize) break;
00752 tmpbytes += reqvect[i].len;
00753 chunkcnt++;
00754 i++;
00755 }
00756
00757
00758 if (i-startitem == 1) {
00759 if (buf) {
00760
00761 fCounters.ReadVSubRequests++;
00762 fCounters.ReadVSubChunks++;
00763 fCounters.ReadVBytes += reqvect[startitem].len;
00764 res = Read(buf, reqvect[startitem].offset, reqvect[startitem].len);
00765
00766 } else {
00767
00768 fCounters.ReadVAsyncSubRequests++;
00769 fCounters.ReadVAsyncSubChunks++;
00770 fCounters.ReadVAsyncBytes += reqvect[startitem].len;
00771 Read_Async(reqvect[startitem].offset, reqvect[startitem].len, false);
00772 res = reqvect[startitem].len;
00773 }
00774 } else {
00775 if (buf) {
00776
00777 res = XrdClientReadV::ReqReadV(fConnModule, fHandle, buf+bytesread,
00778 reqvect, startitem, i-startitem,
00779 fConnModule->GetParallelStreamToUse(reqsperstream) );
00780 fCounters.ReadVSubRequests++;
00781 fCounters.ReadVSubChunks += i-startitem;
00782 fCounters.ReadVBytes += res;
00783 }
00784 else {
00785 res = XrdClientReadV::ReqReadV(fConnModule, fHandle, 0,
00786 reqvect, startitem, i-startitem,
00787 fConnModule->GetParallelStreamToUse(reqsperstream) );
00788 fCounters.ReadVAsyncSubRequests++;
00789 fCounters.ReadVAsyncSubChunks += i-startitem;
00790 fCounters.ReadVAsyncBytes += res;
00791 }
00792 }
00793
00794
00795 startitem = i;
00796
00797 if ( res < 0 )
00798 break;
00799
00800 bytesread += res;
00801
00802 }
00803
00804 if (!buf && !fConnModule->CacheWillFit(bytesread+bytesread/4)) {
00805 Info(XrdClientDebug::kUSERDEBUG, "ReadV",
00806 "Excessive async readv size " << bytesread+bytesread/4 << ". Fixing cache size." );
00807 SetCacheParameters(bytesread, -1, -1);
00808 }
00809
00810
00811
00812 return bytesread;
00813 }
00814
00815
00816
00817 bool XrdClient::Write(const void *buf, long long offset, int len) {
00818
00819 if (!IsOpen_wait()) {
00820 Error("WriteBuffer", "File not opened.");
00821 return FALSE;
00822 }
00823
00824
00825
00826 fConnModule->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT));
00827
00828 fCounters.WrittenBytes += len;
00829 fCounters.WriteRequests++;
00830
00831
00832 ClientRequest writeFileRequest;
00833 memset( &writeFileRequest, 0, sizeof(writeFileRequest) );
00834 fConnModule->SetSID(writeFileRequest.header.streamid);
00835 writeFileRequest.write.requestid = kXR_write;
00836 memcpy( writeFileRequest.write.fhandle, fHandle, sizeof(fHandle) );
00837
00838 bool ret = false;
00839
00840 if (!fUseCache) {
00841
00842 writeFileRequest.write.pathid = 0;
00843 writeFileRequest.write.dlen = len;
00844 writeFileRequest.write.offset = offset;
00845 ret = fConnModule->SendGenCommand(&writeFileRequest, (void *)buf, 0, 0,
00846 FALSE, (char *)"Write");
00847
00848 if (ret && fStatInfo.stated)
00849 fStatInfo.size = xrdmax(fStatInfo.size, offset + len);
00850
00851 return ret;
00852 }
00853
00854
00855
00856
00857 if (!fConnModule->DoWriteSoftCheckPoint()) return false;
00858
00859 fConnModule->RemoveDataFromCache(offset, offset+len-1, true);
00860
00861 XrdClientVector<XrdClientMStream::ReadChunk> rl;
00862 XrdClientMStream::SplitReadRequest(fConnModule, offset, len, rl);
00863 kXR_char *cbuf = (kXR_char *)buf;
00864 int writtenok = 0;
00865
00866 for (int i = 0; i < rl.GetSize(); i++) {
00867
00868 writeFileRequest.write.offset = rl[i].offset;
00869 writeFileRequest.write.dlen = rl[i].len;
00870 writeFileRequest.write.pathid = rl[i].streamtosend;
00871
00872
00873
00874
00875 XReqErrorType b;
00876 int cnt = 0;
00877 do {
00878 b = fConnModule->WriteToServer_Async(&writeFileRequest, (kXR_char *)buf+(rl[i].offset-offset), rl[i].streamtosend);
00879 ret = (b == kOK);
00880 if (b != kNOMORESTREAMS) break;
00881
00882
00883
00884
00885
00886
00887 if (!fConnModule->DoWriteHardCheckPoint()) break;
00888 } while (cnt < 10);
00889
00890 if (b != kOK) {
00891
00892
00893
00894
00895 writeFileRequest.write.pathid = 0;
00896 ret = fConnModule->SendGenCommand(&writeFileRequest, (kXR_char *)buf+(rl[i].offset-offset), 0, 0,
00897 FALSE, (char *)"Write");
00898 if (!ret) break;
00899 }
00900 writtenok += rl[i].len;
00901 cbuf += rl[i].len;
00902 }
00903
00904 if (ret && fStatInfo.stated)
00905 fStatInfo.size = xrdmax(fStatInfo.size, offset + writtenok);
00906
00907 return ret;
00908 }
00909
00910
00911
00912 bool XrdClient::Sync()
00913 {
00914
00915
00916
00917 if (!IsOpen_wait()) {
00918 Error("Sync", "File not opened.");
00919 return FALSE;
00920 }
00921
00922 if (!fConnModule->DoWriteHardCheckPoint()) return false;
00923
00924
00925
00926 fConnModule->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT));
00927
00928
00929 ClientRequest flushFileRequest;
00930 memset( &flushFileRequest, 0, sizeof(flushFileRequest) );
00931
00932 fConnModule->SetSID(flushFileRequest.header.streamid);
00933
00934 flushFileRequest.sync.requestid = kXR_sync;
00935
00936 memcpy(flushFileRequest.sync.fhandle, fHandle, sizeof(fHandle));
00937
00938 flushFileRequest.sync.dlen = 0;
00939
00940 return fConnModule->SendGenCommand(&flushFileRequest, 0, 0, 0,
00941 FALSE, (char *)"Sync");
00942
00943 }
00944
00945
00946 bool XrdClient::TryOpen(kXR_unt16 mode, kXR_unt16 options, bool doitparallel) {
00947
00948 int thrst = 0;
00949
00950 fOpenPars.inprogress = true;
00951
00952 if (doitparallel) {
00953
00954 for (int i = 0; i < DFLT_MAXCONCURRENTOPENS; i++) {
00955
00956 fConcOpenSem.Wait();
00957 fOpenerTh = new XrdClientThread(FileOpenerThread);
00958
00959 thrst = fOpenerTh->Run(this);
00960 if (!thrst) {
00961
00962
00963
00964
00965
00966 return true;
00967 }
00968
00969
00970
00971 Error("XrdClient", "Parallel open thread start failed. Low system"
00972 " resources? Res=" << thrst << " Count=" << i);
00973 delete fOpenerTh;
00974 fOpenerTh = 0;
00975
00976 }
00977
00978
00979
00980 for (int i = 0; i < DFLT_MAXCONCURRENTOPENS; i++) fConcOpenSem.Post();
00981
00982 Error("XrdClient", "All the parallel open thread start attempts failed."
00983 " Desperate situation. Going sync.");
00984
00985 doitparallel = false;
00986 }
00987
00988
00989 bool lowopenRes = LowOpen(fUrl.File.c_str(), mode, options);
00990 if (lowopenRes) {
00991
00992
00993 XrdClientMStream::EstablishParallelStreams(fConnModule);
00994 int retc;
00995
00996 if (!fConnModule->IsConnected()) {
00997 fOpenPars.opened = false;
00998 retc = false;
00999 } else retc = true;
01000
01001 TerminateOpenAttempt();
01002 return retc;
01003
01004 }
01005
01006
01007
01008 if ( (fConnModule->LastServerResp.status != kXR_error) ||
01009 ((fConnModule->LastServerResp.status == kXR_error) &&
01010 (fConnModule->LastServerError.errnum != kXR_NotFound)) ){
01011
01012 TerminateOpenAttempt();
01013
01014 return FALSE;
01015 }
01016
01017
01018
01019
01020
01021
01022
01023 if (fConnModule->GetLBSUrl() &&
01024 ( (fConnModule->GetCurrentUrl().Host != fConnModule->GetLBSUrl()->Host) ||
01025 (fConnModule->GetCurrentUrl().Port != fConnModule->GetLBSUrl()->Port) ) ) {
01026 XrdOucString opinfo;
01027
01028 opinfo = "&tried=" + fConnModule->GetCurrentUrl().Host;
01029
01030 Info(XrdClientDebug::kUSERDEBUG,
01031 "Open", "Back to " << fConnModule->GetLBSUrl()->Host <<
01032 ". Refreshing cache. Opaque info: " << opinfo);
01033
01034
01035
01036
01037 fConnModule->Disconnect(FALSE);
01038
01039 if ( (fConnModule->GoToAnotherServer(*fConnModule->GetLBSUrl()) == kOK) &&
01040 LowOpen(fUrl.File.c_str(), mode, options | kXR_refresh,
01041 (char *)opinfo.c_str() ) ) {
01042
01043
01044 XrdClientMStream::EstablishParallelStreams(fConnModule);
01045
01046 TerminateOpenAttempt();
01047
01048 return TRUE;
01049 }
01050 else {
01051
01052 Error("Open", "Error opening the file.");
01053 TerminateOpenAttempt();
01054
01055 return FALSE;
01056 }
01057
01058 }
01059
01060 TerminateOpenAttempt();
01061 return FALSE;
01062
01063 }
01064
01065
01066 bool XrdClient::LowOpen(const char *file, kXR_unt16 mode, kXR_unt16 options,
01067 char *additionalquery) {
01068
01069
01070 XrdOucString finalfilename(file);
01071
01072 if ((fConnModule->fRedirOpaque.length() > 0) || additionalquery) {
01073 finalfilename += "?";
01074
01075 if (fConnModule->fRedirOpaque.length() > 0)
01076 finalfilename += fConnModule->fRedirOpaque;
01077
01078 if (additionalquery)
01079 finalfilename += additionalquery;
01080 }
01081
01082
01083
01084
01085
01086
01087 ClientRequest openFileRequest;
01088
01089 char buf[1024];
01090 struct ServerResponseBody_Open *openresp = (struct ServerResponseBody_Open *)buf;
01091
01092 memset(&openFileRequest, 0, sizeof(openFileRequest));
01093
01094 fConnModule->SetSID(openFileRequest.header.streamid);
01095
01096 openFileRequest.header.requestid = kXR_open;
01097
01098 openFileRequest.open.options = options | kXR_retstat;
01099
01100
01101 openFileRequest.open.mode = mode;
01102
01103
01104
01105 openFileRequest.open.dlen = finalfilename.length();
01106
01107
01108 bool resp = fConnModule->SendGenCommand(&openFileRequest,
01109 (const void *)finalfilename.c_str(),
01110 0, openresp, false, (char *)"Open");
01111
01112 if (resp && (fConnModule->LastServerResp.status == 0)) {
01113
01114 if (fConnModule->LastServerResp.dlen >= (kXR_int32)sizeof(fHandle)) {
01115
01116 memcpy( fHandle, openresp->fhandle, sizeof(fHandle) );
01117
01118 fOpenPars.opened = TRUE;
01119 fOpenPars.options = options;
01120 fOpenPars.mode = mode;
01121 }
01122 else
01123 Error("Open",
01124 "Server did not return a filehandle. Protocol error.");
01125
01126 if (fConnModule->LastServerResp.dlen > 12) {
01127
01128 Info(XrdClientDebug::kHIDEBUG,
01129 "Open", "Returned stats=" << ((char *)openresp + sizeof(struct ServerResponseBody_Open)));
01130
01131 sscanf((char *)openresp + sizeof(struct ServerResponseBody_Open), "%ld %lld %ld %ld",
01132 &fStatInfo.id,
01133 &fStatInfo.size,
01134 &fStatInfo.flags,
01135 &fStatInfo.modtime);
01136
01137 fStatInfo.stated = true;
01138 }
01139
01140 }
01141
01142
01143 return fOpenPars.opened;
01144 }
01145
01146
01147 bool XrdClient::Stat(struct XrdClientStatInfo *stinfo, bool force) {
01148
01149 if (!force && fStatInfo.stated) {
01150 if (stinfo)
01151 memcpy(stinfo, &fStatInfo, sizeof(fStatInfo));
01152 return TRUE;
01153 }
01154
01155 if (!IsOpen_wait()) {
01156 Error("Stat", "File not opened.");
01157 return FALSE;
01158 }
01159
01160 if (force && !Sync()) return false;
01161
01162
01163 ClientRequest statFileRequest;
01164
01165 memset(&statFileRequest, 0, sizeof(ClientRequest));
01166
01167 fConnModule->SetSID(statFileRequest.header.streamid);
01168
01169 statFileRequest.stat.requestid = kXR_stat;
01170 memset(statFileRequest.stat.reserved, 0,
01171 sizeof(statFileRequest.stat.reserved));
01172
01173 statFileRequest.stat.dlen = fUrl.File.length();
01174
01175 char fStats[2048];
01176 memset(fStats, 0, 2048);
01177
01178 bool ok = fConnModule->SendGenCommand(&statFileRequest,
01179 (const char*)fUrl.File.c_str(),
01180 0, fStats , FALSE, (char *)"Stat");
01181
01182 if (ok && (fConnModule->LastServerResp.status == 0) ) {
01183
01184 Info(XrdClientDebug::kHIDEBUG,
01185 "Stat", "Returned stats=" << fStats);
01186
01187 sscanf(fStats, "%ld %lld %ld %ld",
01188 &fStatInfo.id,
01189 &fStatInfo.size,
01190 &fStatInfo.flags,
01191 &fStatInfo.modtime);
01192
01193 if (stinfo)
01194 memcpy(stinfo, &fStatInfo, sizeof(fStatInfo));
01195
01196 fStatInfo.stated = true;
01197 }
01198
01199 return ok;
01200 }
01201
01202
01203 bool XrdClient::Close() {
01204
01205 if (!IsOpen_wait()) {
01206 Info(XrdClientDebug::kUSERDEBUG, "Close", "File not opened.");
01207 return TRUE;
01208 }
01209
01210 ClientRequest closeFileRequest;
01211
01212
01213 fConnModule->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT));
01214
01215 memset(&closeFileRequest, 0, sizeof(closeFileRequest) );
01216
01217 fConnModule->SetSID(closeFileRequest.header.streamid);
01218
01219 closeFileRequest.close.requestid = kXR_close;
01220 memcpy(closeFileRequest.close.fhandle, fHandle, sizeof(fHandle) );
01221 closeFileRequest.close.dlen = 0;
01222
01223
01224
01225 if (IsOpenedForWrite())
01226 fConnModule->DoWriteHardCheckPoint();
01227
01228 fConnModule->SendGenCommand(&closeFileRequest,
01229 0,
01230 0, 0 , FALSE, (char *)"Close");
01231
01232
01233 fOpenPars.opened = FALSE;
01234
01235 return TRUE;
01236 }
01237
01238
01239
01240 bool XrdClient::OpenFileWhenRedirected(char *newfhandle, bool &wasopen)
01241 {
01242
01243
01244
01245 wasopen = fOpenPars.opened;
01246
01247 if (!fOpenPars.opened)
01248 return TRUE;
01249
01250 fOpenPars.opened = FALSE;
01251
01252 Info(XrdClientDebug::kHIDEBUG,
01253 "OpenFileWhenRedirected", "Trying to reopen the same file." );
01254
01255 kXR_unt16 options = fOpenPars.options;
01256
01257 if (fOpenPars.options & kXR_delete) {
01258 Info(XrdClientDebug::kHIDEBUG,
01259 "OpenFileWhenRedirected", "Stripping off the 'delete' option." );
01260
01261 options &= !kXR_delete;
01262 options |= kXR_open_updt;
01263 }
01264
01265 if (fOpenPars.options & kXR_new) {
01266 Info(XrdClientDebug::kHIDEBUG,
01267 "OpenFileWhenRedirected", "Stripping off the 'new' option." );
01268
01269 options &= !kXR_new;
01270 options |= kXR_open_updt;
01271 }
01272
01273 if ( TryOpen(fOpenPars.mode, options, false) ) {
01274
01275 fOpenPars.opened = TRUE;
01276
01277 Info(XrdClientDebug::kHIDEBUG,
01278 "OpenFileWhenRedirected",
01279 "Open successful." );
01280
01281 memcpy(newfhandle, fHandle, sizeof(fHandle));
01282
01283 return TRUE;
01284 } else {
01285 Error("OpenFileWhenRedirected",
01286 "File open failed.");
01287
01288 return FALSE;
01289 }
01290 }
01291
01292
01293 bool XrdClient::Copy(const char *localpath) {
01294
01295 if (!IsOpen_wait()) {
01296 Error("Copy", "File not opened.");
01297 return FALSE;
01298 }
01299
01300 Stat(0);
01301 int f = open(localpath, O_CREAT | O_RDWR, 0);
01302 if (f < 0) {
01303 Error("Copy", "Error opening local file.");
01304 return FALSE;
01305 }
01306
01307 void *buf = malloc(100000);
01308 long long offs = 0;
01309 int nr = 1;
01310
01311 while ((nr > 0) && (offs < fStatInfo.size))
01312 if ( (nr = Read(buf, offs, 100000)) )
01313 offs += write(f, buf, nr);
01314
01315 close(f);
01316 free(buf);
01317
01318 return TRUE;
01319 }
01320
01321
01322 UnsolRespProcResult XrdClient::ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *sender,
01323 XrdClientMessage *unsolmsg) {
01324
01325
01326
01327
01328
01329
01330 if ( unsolmsg->GetStatusCode() != XrdClientMessage::kXrdMSC_ok ) {
01331 Info(XrdClientDebug::kHIDEBUG,
01332 "ProcessUnsolicitedMsg", "Incoming unsolicited communication error message." );
01333 }
01334 else {
01335 Info(XrdClientDebug::kHIDEBUG,
01336 "ProcessUnsolicitedMsg", "Incoming unsolicited response from streamid " <<
01337 unsolmsg->HeaderSID() );
01338 }
01339
01340
01341
01342 if (unsolmsg->IsAttn()) {
01343 struct ServerResponseBody_Attn *attnbody;
01344
01345 attnbody = (struct ServerResponseBody_Attn *)unsolmsg->GetData();
01346
01347 int actnum = (attnbody) ? (attnbody->actnum) : 0;
01348
01349
01350 switch (actnum) {
01351
01352 case kXR_asyncdi:
01353
01354
01355 struct ServerResponseBody_Attn_asyncdi *di;
01356 di = (struct ServerResponseBody_Attn_asyncdi *)unsolmsg->GetData();
01357
01358
01359 if (di) {
01360 Info(XrdClientDebug::kUSERDEBUG,
01361 "ProcessUnsolicitedMsg", "Requested Disconnection + Reconnect in " <<
01362 ntohl(di->wsec) << " seconds.");
01363
01364 fConnModule->SetRequestedDestHost((char *)fUrl.Host.c_str(), fUrl.Port);
01365 fConnModule->SetREQDelayedConnectState(ntohl(di->wsec));
01366 }
01367
01368
01369 return kUNSOL_CONTINUE;
01370 break;
01371
01372 case kXR_asyncrd:
01373
01374
01375 struct ServerResponseBody_Attn_asyncrd *rd;
01376 rd = (struct ServerResponseBody_Attn_asyncrd *)unsolmsg->GetData();
01377
01378
01379 if (rd && (strlen(rd->host) > 0)) {
01380 Info(XrdClientDebug::kUSERDEBUG,
01381 "ProcessUnsolicitedMsg", "Requested redir to " << rd->host <<
01382 ":" << ntohl(rd->port));
01383
01384 fConnModule->SetRequestedDestHost(rd->host, ntohl(rd->port));
01385 }
01386
01387
01388 return kUNSOL_CONTINUE;
01389 break;
01390
01391 case kXR_asyncwt:
01392
01393
01394 struct ServerResponseBody_Attn_asyncwt *wt;
01395 wt = (struct ServerResponseBody_Attn_asyncwt *)unsolmsg->GetData();
01396
01397 if (wt) {
01398 Info(XrdClientDebug::kUSERDEBUG,
01399 "ProcessUnsolicitedMsg", "Pausing client for " << ntohl(wt->wsec) <<
01400 " seconds.");
01401
01402 fConnModule->SetREQPauseState(ntohl(wt->wsec));
01403 }
01404
01405
01406 return kUNSOL_CONTINUE;
01407 break;
01408
01409 case kXR_asyncgo:
01410
01411
01412 Info(XrdClientDebug::kUSERDEBUG,
01413 "ProcessUnsolicitedMsg", "Resuming from pause.");
01414
01415 fConnModule->SetREQPauseState(0);
01416
01417
01418 return kUNSOL_CONTINUE;
01419 break;
01420
01421 case kXR_asynresp:
01422
01423
01424
01425
01426
01427 return fConnModule->ProcessAsynResp(unsolmsg);
01428 break;
01429
01430 default:
01431
01432 Info(XrdClientDebug::kUSERDEBUG,
01433 "ProcessUnsolicitedMsg", "Empty message");
01434
01435
01436 return kUNSOL_CONTINUE;
01437
01438 }
01439
01440
01441 }
01442 else
01443
01444 if (unsolmsg->GetStatusCode() != XrdClientMessage::kXrdMSC_ok){
01445
01446
01447 fReadWaitData->Broadcast();
01448 TerminateOpenAttempt();
01449
01450 return fConnModule->ProcessAsynResp(unsolmsg);
01451 }
01452 else
01453
01454 if ( ConnectionManager->SidManager()->JoinedSids(fConnModule->GetStreamID(),
01455 unsolmsg->HeaderSID()) ) {
01456 struct SidInfo *si =
01457 ConnectionManager->SidManager()->GetSidInfo(unsolmsg->HeaderSID());
01458
01459 if (!si) {
01460 Error("ProcessUnsolicitedMsg",
01461 "Orphaned streamid detected: " << unsolmsg->HeaderSID());
01462 return kUNSOL_DISPOSE;
01463 }
01464
01465
01466 ClientRequest *req = &(si->outstandingreq);
01467
01468 Info(XrdClientDebug::kHIDEBUG,
01469 "ProcessUnsolicitedMsg",
01470 "Processing async response from streamid " <<
01471 unsolmsg->HeaderSID() << " father=" <<
01472 si->fathersid );
01473
01474
01475 if ( (unsolmsg->HeaderStatus() == kXR_oksofar) ||
01476 (unsolmsg->HeaderStatus() == kXR_ok) ) {
01477
01478 switch (req->header.requestid) {
01479
01480 case kXR_read: {
01481 long long offs = req->read.offset + si->reqbyteprogress;
01482
01483 Info(XrdClientDebug::kHIDEBUG, "ProcessUnsolicitedMsg",
01484 "Putting kXR_read data into cache. Offset=" <<
01485 offs <<
01486 " len " <<
01487 unsolmsg->fHdr.dlen);
01488
01489 {
01490
01491 XrdSysCondVarHelper cndh(fReadWaitData);
01492
01493
01494 fConnModule->SubmitDataToCache(unsolmsg, offs,
01495 offs + unsolmsg->fHdr.dlen - 1);
01496
01497 }
01498 si->reqbyteprogress += unsolmsg->fHdr.dlen;
01499
01500
01501 fReadWaitData->Broadcast();
01502
01503 if (unsolmsg->HeaderStatus() == kXR_ok) return kUNSOL_DISPOSE;
01504 else return kUNSOL_KEEP;
01505
01506 break;
01507 }
01508
01509 case kXR_readv: {
01510
01511 Info(XrdClientDebug::kHIDEBUG, "ProcessUnsolicitedMsg",
01512 "Putting kXR_readV data into cache. " <<
01513 " len " <<
01514 unsolmsg->fHdr.dlen);
01515 {
01516
01517 XrdSysCondVarHelper cndh(fReadWaitData);
01518
01519 XrdClientReadV::SubmitToCacheReadVResp(fConnModule, (char *)unsolmsg->DonateData(),
01520 unsolmsg->fHdr.dlen);
01521 }
01522
01523 fReadWaitData->Broadcast();
01524
01525 if (unsolmsg->HeaderStatus() == kXR_ok) return kUNSOL_DISPOSE;
01526 else return kUNSOL_KEEP;
01527
01528 break;
01529 }
01530
01531
01532 case kXR_write: {
01533 Info(XrdClientDebug::kHIDEBUG, "ProcessUnsolicitedMsg",
01534 "Got positive ack for write req " << req->header.dlen <<
01535 "@" << req->write.offset);
01536
01537
01538
01539 fConnModule->UnPinCacheBlk(req->write.offset, req->write.offset+req->header.dlen);
01540
01541
01542 if (EnvGetLong(NAME_PURGEWRITTENBLOCKS))
01543 fConnModule->RemoveDataFromCache(req->write.offset, req->write.offset+req->header.dlen-1, true);
01544
01545
01546 return kUNSOL_DISPOSE;
01547 }
01548 }
01549
01550 }
01551 else {
01552
01553
01554 switch (req->header.requestid) {
01555
01556 case kXR_read: {
01557
01558
01559 Error("ProcessUnsolicitedMsg",
01560 "Got a kxr_read error. Req offset=" <<
01561 req->read.offset <<
01562 " len=" <<
01563 req->read.rlen);
01564
01565 {
01566
01567 XrdSysCondVarHelper cndh(fReadWaitData);
01568
01569
01570
01571
01572 fConnModule->RemoveDataFromCache(req->read.offset,
01573 req->read.offset + req->read.rlen - 1, true);
01574
01575 }
01576
01577
01578
01579 struct ServerResponseBody_Error *body_err;
01580 body_err = (struct ServerResponseBody_Error *)(unsolmsg->GetData());
01581 if (body_err)
01582 Info(XrdClientDebug::kNODEBUG, "ProcessUnsolicitedMsg", "Server declared: " <<
01583 (const char*)body_err->errmsg << "(error code: " << ntohl(body_err->errnum) << ")");
01584
01585
01586 memset(&fConnModule->LastServerError, 0, sizeof(fConnModule->LastServerError));
01587 memcpy(&fConnModule->LastServerError, body_err,
01588 xrdmin(sizeof(fConnModule->LastServerError), (unsigned)unsolmsg->DataLen()) );
01589 fConnModule->LastServerError.errnum = ntohl(body_err->errnum);
01590
01591
01592
01593 fReadWaitData->Broadcast();
01594
01595
01596 return kUNSOL_CONTINUE;
01597
01598 break;
01599 }
01600 case kXR_write: {
01601 Error("ProcessUnsolicitedMsg",
01602 "Got a kxr_write error. Req offset=" <<
01603 req->write.offset <<
01604 " len=" <<
01605 req->write.dlen);
01606
01607
01608
01609 struct ServerResponseBody_Error *body_err;
01610 body_err = (struct ServerResponseBody_Error *)(unsolmsg->GetData());
01611 if (body_err) {
01612 Info(XrdClientDebug::kNODEBUG, "ProcessUnsolicitedMsg", "Server declared: " <<
01613 (const char*)body_err->errmsg << "(error code: " << ntohl(body_err->errnum) << ") writing " <<
01614 req->write.dlen << "@" << req->write.offset);
01615
01616
01617 memset(&fConnModule->LastServerError, 0, sizeof(fConnModule->LastServerError));
01618 memcpy(&fConnModule->LastServerError, body_err,
01619 xrdmin(sizeof(fConnModule->LastServerError), (unsigned)unsolmsg->DataLen()) );
01620 fConnModule->LastServerError.errnum = ntohl(body_err->errnum);
01621
01622
01623 ConnectionManager->SidManager()->ReportSidResp(unsolmsg->HeaderSID(),
01624 unsolmsg->GetStatusCode(),
01625 ntohl(body_err->errnum),
01626 body_err->errmsg);
01627 }
01628 else
01629 ConnectionManager->SidManager()->ReportSidResp(unsolmsg->HeaderSID(),
01630 unsolmsg->GetStatusCode(),
01631 kXR_noErrorYet,
01632 0);
01633
01634
01635 fReadWaitData->Broadcast();
01636
01637
01638
01639 return kUNSOL_KEEP;
01640
01641 break;
01642 }
01643
01644 }
01645 }
01646
01647
01648
01649
01650
01651
01652 }
01653
01654
01655 return kUNSOL_CONTINUE;
01656 }
01657
01658 XReqErrorType XrdClient::Read_Async(long long offset, int len, bool updatecounters) {
01659
01660 if (!IsOpen_wait()) {
01661 Error("Read", "File not opened.");
01662 return kGENERICERR;
01663 }
01664
01665 Stat(0);
01666 len = xrdmin(fStatInfo.size - offset, len);
01667
01668 if (len <= 0) return kOK;
01669
01670 if (fUseCache)
01671 fConnModule->SubmitPlaceholderToCache(offset, offset+len-1);
01672 else return kOK;
01673
01674 if (updatecounters) {
01675 fCounters.ReadAsyncRequests++;
01676 fCounters.ReadAsyncBytes += len;
01677 }
01678
01679
01680 ClientRequest readFileRequest;
01681 memset( &readFileRequest, 0, sizeof(readFileRequest) );
01682
01683
01684 readFileRequest.read.requestid = kXR_read;
01685 memcpy( readFileRequest.read.fhandle, fHandle, sizeof(fHandle) );
01686 readFileRequest.read.offset = offset;
01687 readFileRequest.read.rlen = len;
01688 readFileRequest.read.dlen = 0;
01689
01690 Info(XrdClientDebug::kHIDEBUG, "Read_Async",
01691 "Requesting to read " <<
01692 readFileRequest.read.rlen <<
01693 " bytes of data at offset " <<
01694 readFileRequest.read.offset);
01695
01696
01697 XrdClientVector<XrdClientMStream::ReadChunk> chunks;
01698 XReqErrorType ok = kOK;
01699
01700 if (XrdClientMStream::SplitReadRequest(fConnModule, offset, len,
01701 chunks) ) {
01702
01703 for (int i = 0; i < chunks.GetSize(); i++) {
01704 XrdClientMStream::ReadChunk *c;
01705
01706 read_args args;
01707 memset(&args, 0, sizeof(args));
01708
01709 c = &chunks[i];
01710 args.pathid = c->streamtosend;
01711
01712 Info(XrdClientDebug::kHIDEBUG, "Read_Async",
01713 "Requesting pathid " << c->streamtosend);
01714
01715 readFileRequest.read.offset = c->offset;
01716 readFileRequest.read.rlen = c->len;
01717
01718 if (args.pathid != 0) {
01719 readFileRequest.read.dlen = sizeof(read_args);
01720 ok = fConnModule->WriteToServer_Async(&readFileRequest, &args,
01721 0);
01722 }
01723 else {
01724 readFileRequest.read.dlen = 0;
01725 ok = fConnModule->WriteToServer_Async(&readFileRequest, 0,
01726 0);
01727 }
01728
01729
01730 if (ok != kOK) break;
01731 }
01732 }
01733 else
01734 return (fConnModule->WriteToServer_Async(&readFileRequest, 0));
01735
01736 return ok;
01737
01738 }
01739
01740
01741
01742 bool XrdClient::Truncate(long long len) {
01743
01744 if (!IsOpen_wait()) {
01745 Info(XrdClientDebug::kUSERDEBUG, "Truncate", "File not opened.");
01746 return true;
01747 }
01748
01749 ClientRequest truncFileRequest;
01750
01751 memset(&truncFileRequest, 0, sizeof(truncFileRequest) );
01752
01753 fConnModule->SetSID(truncFileRequest.header.streamid);
01754
01755 truncFileRequest.truncate.requestid = kXR_truncate;
01756 memcpy(truncFileRequest.truncate.fhandle, fHandle, sizeof(fHandle) );
01757 truncFileRequest.truncate.offset = len;
01758
01759 bool ok = fConnModule->SendGenCommand(&truncFileRequest,
01760 0,
01761 0, 0 , FALSE, (char *)"Truncate");
01762
01763 if (ok && fStatInfo.stated) fStatInfo.size = len;
01764
01765 return ok;
01766 }
01767
01768
01769
01770
01771
01772 void XrdClient::WaitForNewAsyncData() {
01773 XrdSysCondVarHelper cndh(fReadWaitData);
01774
01775 fReadWaitData->Wait();
01776
01777 }
01778
01779
01780 bool XrdClient::UseCache(bool u)
01781 {
01782
01783
01784
01785 bool r = fUseCache;
01786
01787 if (!u) {
01788 fUseCache = false;
01789 } else {
01790 int size;
01791 long long bytessubmitted, byteshit, misscount, readreqcnt;
01792 float missrate, bytesusefulness;
01793
01794
01795 if ( fConnModule &&
01796 fConnModule->GetCacheInfo(size, bytessubmitted, byteshit, misscount, missrate, readreqcnt, bytesusefulness) &&
01797 size )
01798 fUseCache = true;
01799 }
01800
01801
01802 return r;
01803 }
01804
01805
01806
01807
01808
01809 void XrdClient::SetCacheParameters(int CacheSize, int ReadAheadSize, int RmPolicy) {
01810 if (fConnModule) {
01811 if (CacheSize >= 0) fConnModule->SetCacheSize(CacheSize);
01812 if (RmPolicy >= 0) fConnModule->SetCacheRmPolicy(RmPolicy);
01813 }
01814
01815 if ((ReadAheadSize >= 0) && fReadAheadMgr) fReadAheadMgr->SetRASize(ReadAheadSize);
01816 }
01817
01818
01819 void XrdClient::SetReadAheadStrategy(int strategy) {
01820 if (!fConnModule) return;
01821
01822 if (fReadAheadMgr && fReadAheadMgr->GetCurrentStrategy() != (XrdClientReadAheadMgr::XrdClient_RAStrategy)strategy) {
01823
01824 delete fReadAheadMgr;
01825 fReadAheadMgr = 0;
01826 }
01827
01828 if (!fReadAheadMgr)
01829 fReadAheadMgr = XrdClientReadAheadMgr::CreateReadAheadMgr((XrdClientReadAheadMgr::XrdClient_RAStrategy)strategy);
01830 }
01831
01832
01833
01834
01835
01836 void XrdClient::SetBlockReadTrimming(int blocksize) {
01837 blocksize = blocksize >> 9;
01838 blocksize = blocksize << 9;
01839 if (blocksize < 512) blocksize = 512;
01840
01841 fReadTrimBlockSize = blocksize;
01842 }
01843
01844
01845 bool XrdClient::GetCacheInfo(
01846
01847 int &size,
01848
01849
01850 long long &bytessubmitted,
01851
01852
01853 long long &byteshit,
01854
01855
01856
01857 long long &misscount,
01858
01859
01860 float &missrate,
01861
01862
01863 long long &readreqcnt,
01864
01865
01866 float &bytesusefulness
01867 ) {
01868 if (!fConnModule) return false;
01869
01870
01871 if (!fConnModule->GetCacheInfo(size,
01872 bytessubmitted,
01873 byteshit,
01874 misscount,
01875 missrate,
01876 readreqcnt,
01877 bytesusefulness))
01878 return false;
01879
01880 return true;
01881 }
01882
01883
01884 bool XrdClient::GetCounters( XrdClientCounters *cnt ) {
01885
01886 fCounters.ReadMisses = fCounters.ReadRequests-fCounters.ReadHits;
01887 fCounters.ReadMissRate = ( fCounters.ReadRequests ? (float)fCounters.ReadMisses / fCounters.ReadRequests : 0 );
01888
01889 memcpy( cnt, &fCounters, sizeof(fCounters));
01890 return true;
01891 }
01892
01893
01894
01895 void XrdClient::PrintCounters() {
01896
01897 if (DebugLevel() < XrdClientDebug::kUSERDEBUG) return;
01898
01899 XrdClientCounters cnt;
01900 GetCounters(&cnt);
01901
01902 printf("XrdClient counters:\n");;
01903 printf(" ReadBytes: %lld\n", cnt.ReadBytes );
01904 printf(" WrittenBytes: %lld\n", cnt.WrittenBytes );
01905 printf(" WriteRequests: %lld\n", cnt.WriteRequests );
01906
01907 printf(" ReadRequests: %lld\n", cnt.ReadRequests );
01908 printf(" ReadMisses: %lld\n", cnt.ReadMisses );
01909 printf(" ReadHits: %lld\n", cnt.ReadHits );
01910 printf(" ReadMissRate: %f\n", cnt.ReadMissRate );
01911
01912 printf(" ReadVRequests: %lld\n", cnt.ReadVRequests );
01913 printf(" ReadVSubRequests: %lld\n", cnt.ReadVSubRequests );
01914 printf(" ReadVSubChunks: %lld\n", cnt.ReadVSubChunks );
01915 printf(" ReadVBytes: %lld\n", cnt.ReadVBytes );
01916
01917 printf(" ReadVAsyncRequests: %lld\n", cnt.ReadVAsyncRequests );
01918 printf(" ReadVAsyncSubRequests: %lld\n", cnt.ReadVAsyncSubRequests );
01919 printf(" ReadVAsyncSubChunks: %lld\n", cnt.ReadVAsyncSubChunks );
01920 printf(" ReadVAsyncBytes: %lld\n", cnt.ReadVAsyncBytes );
01921
01922 printf(" ReadAsyncRequests: %lld\n", cnt.ReadAsyncRequests );
01923 printf(" ReadAsyncBytes: %lld\n\n", cnt.ReadAsyncBytes );
01924
01925 }
01926
01927