00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdXrootdAioCVSID = "$Id: XrdXrootdAio.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <unistd.h>
00016
00017 #include "Xrd/XrdBuffer.hh"
00018 #include "Xrd/XrdLink.hh"
00019 #include "XrdSys/XrdSysError.hh"
00020 #include "XrdSys/XrdSysPthread.hh"
00021 #include "XrdSfs/XrdSfsInterface.hh"
00022 #include "XrdXrootd/XrdXrootdAio.hh"
00023 #include "XrdXrootd/XrdXrootdFile.hh"
00024 #include "XrdXrootd/XrdXrootdProtocol.hh"
00025 #include "XrdXrootd/XrdXrootdStats.hh"
00026 #include "XrdXrootd/XrdXrootdTrace.hh"
00027
00028
00029
00030
00031
00032 XrdBuffManager *XrdXrootdAio::BPool;
00033 XrdScheduler *XrdXrootdAio::Sched;
00034 XrdXrootdStats *XrdXrootdAio::SI;
00035
00036 XrdSysMutex XrdXrootdAio::fqMutex;
00037 XrdXrootdAio *XrdXrootdAio::fqFirst = 0;
00038 const char *XrdXrootdAio::TraceID = "Aio";
00039
00040 int XrdXrootdAio::maxAio;
00041
00042 XrdSysError *XrdXrootdAioReq::eDest;
00043 XrdSysMutex XrdXrootdAioReq::rqMutex;
00044 XrdXrootdAioReq *XrdXrootdAioReq::rqFirst = 0;
00045 const char *XrdXrootdAioReq::TraceID = "AioReq";
00046
00047 int XrdXrootdAioReq::QuantumMin;
00048 int XrdXrootdAioReq::Quantum;
00049 int XrdXrootdAioReq::QuantumMax;
00050 int XrdXrootdAioReq::maxAioPR = 8;
00051 int XrdXrootdAioReq::maxAioPR2 =16;
00052
00053 extern XrdOucTrace *XrdXrootdTrace;
00054
00055
00056
00057
00058
00059 XrdXrootdAio *XrdXrootdAio::Alloc(XrdXrootdAioReq *arp, int bsize)
00060 {
00061 XrdXrootdAio *aiop;
00062
00063
00064
00065 fqMutex.Lock();
00066 if ((aiop = fqFirst)) fqFirst = aiop->Next;
00067 else if (maxAio) aiop = addBlock();
00068 if (aiop && (SI->AsyncNow > SI->AsyncMax)) SI->AsyncMax = SI->AsyncNow;
00069 fqMutex.UnLock();
00070
00071
00072
00073 if (aiop)
00074 {if (bsize && (aiop->buffp = BPool->Obtain(bsize)))
00075 {aiop->sfsAio.aio_buf = (void *)(aiop->buffp->buff);
00076 aiop->aioReq = arp;
00077 aiop->TIdent = arp->Link->ID;
00078 }
00079 else {aiop->Recycle(); aiop = 0;}
00080 }
00081
00082
00083
00084 return aiop;
00085 }
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100 void XrdXrootdAio::doneRead()
00101 {
00102
00103
00104 aioReq->aioDone = this;
00105
00106
00107
00108 if (Result >= 0) aioReq->aioTotal += Result;
00109 else if (!aioReq->aioError) aioReq->aioError = Result;
00110
00111
00112
00113 Sched->Schedule((XrdJob *)aioReq);
00114 }
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125 void XrdXrootdAio::doneWrite()
00126 {
00127 char recycle = 0;
00128
00129
00130
00131 aioReq->Lock();
00132 aioReq->numActive--;
00133
00134
00135
00136 if (Result >= 0) {aioReq->myIOLen -= Result;
00137 aioReq->aioTotal += Result;
00138 }
00139 else if (!aioReq->aioError) aioReq->aioError = Result;
00140
00141
00142
00143
00144 if (aioReq->reDrive)
00145 {Sched->Schedule((XrdJob *)aioReq->Link);
00146 aioReq->reDrive = 0;
00147 }
00148
00149
00150
00151
00152
00153
00154 if (aioReq->myIOLen > 0)
00155 {Next = aioReq->aioFree; aioReq->aioFree = this;}
00156 else {if (!(aioReq->numActive)) Sched->Schedule((XrdJob *)aioReq);
00157 recycle = 1;
00158 }
00159
00160
00161
00162 aioReq->UnLock();
00163 if (recycle) Recycle();
00164 }
00165
00166
00167
00168
00169
00170 void XrdXrootdAio::Recycle()
00171 {
00172
00173
00174
00175 if (buffp) {BPool->Release(buffp); buffp = 0;}
00176
00177
00178
00179 fqMutex.Lock();
00180 Next = fqFirst;
00181 fqFirst = this;
00182 if (--SI->AsyncNow < 0) SI->AsyncNow=0;
00183 fqMutex.UnLock();
00184 }
00185
00186
00187
00188
00189
00190
00191
00192
00193 XrdXrootdAio *XrdXrootdAio::addBlock()
00194 {
00195 const int numalloc = 4096/sizeof(XrdXrootdAio);
00196 int i = (numalloc <= maxAio ? numalloc : maxAio);
00197 XrdXrootdAio *aiop;
00198
00199 TRACE(DEBUG, "Adding " <<i <<" aio objects; " <<maxAio <<" pending.");
00200
00201 if ((aiop = new XrdXrootdAio[i]()))
00202 {maxAio -= i;
00203 while(--i) {aiop->Next = fqFirst; fqFirst = aiop; aiop++;}
00204 }
00205
00206 return aiop;
00207 }
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222 XrdXrootdAioReq *XrdXrootdAioReq::Alloc(XrdXrootdProtocol *prot,
00223 char iotype, int numaio)
00224 {
00225 int i, cntaio, myQuantum, iolen = prot->myIOLen;
00226 XrdXrootdAioReq *arp;
00227 XrdXrootdAio *aiop;
00228
00229
00230
00231 rqMutex.Lock();
00232 if ((arp = rqFirst)) rqFirst = arp->Next;
00233 else arp = addBlock();
00234 rqMutex.UnLock();
00235
00236
00237
00238 if (!arp) return arp;
00239 arp->Clear(prot->Link);
00240 if (!numaio) numaio = maxAioPR;
00241
00242
00243
00244
00245
00246
00247
00248 if (iolen < Quantum)
00249 {myQuantum = QuantumMin;
00250 if (!(cntaio = iolen / myQuantum)) cntaio = 1;
00251 else if (iolen % myQuantum) cntaio++;
00252 } else {cntaio = iolen / Quantum;
00253 if (cntaio <= maxAioPR2) myQuantum = Quantum;
00254 else {myQuantum = QuantumMax;
00255 cntaio = iolen / myQuantum;
00256 }
00257 if (iolen % myQuantum) cntaio++;
00258 }
00259
00260
00261
00262 i = (maxAioPR < cntaio ? maxAioPR : cntaio);
00263 while(i && (aiop = XrdXrootdAio::Alloc(arp, myQuantum)))
00264 {aiop->Next = arp->aioFree; arp->aioFree = aiop; i--;}
00265
00266
00267
00268 if (i && (maxAioPR - i) < 2 && cntaio > 1)
00269 {arp->Recycle(0); return (XrdXrootdAioReq *)0;}
00270
00271
00272
00273 if (iotype != 'w') prot->Link->setRef(1);
00274 arp->Instance = prot->Link->Inst();
00275 arp->myIOLen = iolen;
00276 arp->myOffset = prot->myOffset;
00277 arp->myFile = prot->myFile;
00278 arp->Response = prot->Response;
00279 arp->aioType = iotype;
00280
00281
00282
00283 return arp;
00284 }
00285
00286
00287
00288
00289
00290 XrdXrootdAio *XrdXrootdAioReq::getAio()
00291 {
00292 XrdXrootdAio *aiop;
00293
00294
00295
00296
00297
00298
00299 Lock();
00300 if ((aiop = aioFree)) {aioFree = aiop->Next; aiop->Next = 0;}
00301 else reDrive = 1;
00302 UnLock();
00303 return aiop;
00304 }
00305
00306
00307
00308
00309
00310 void XrdXrootdAioReq::Init(int iosize, int maxaiopr, int maxaio)
00311 {
00312 XrdXrootdAio *aiop;
00313 XrdXrootdAioReq *arp;
00314
00315
00316
00317
00318 XrdXrootdAio::Sched = XrdXrootdProtocol::Sched;
00319 XrdXrootdAio::BPool = XrdXrootdProtocol::BPool;
00320 XrdXrootdAio::SI = XrdXrootdProtocol::SI;
00321
00322
00323
00324 eDest = &XrdXrootdProtocol::eDest;
00325 Quantum = static_cast<size_t>(iosize);
00326 QuantumMin = Quantum / 2;
00327 QuantumMax = Quantum * 2;
00328 if (QuantumMax > XrdXrootdProtocol::maxBuffsz)
00329 QuantumMax = XrdXrootdProtocol::maxBuffsz;
00330
00331
00332
00333
00334
00335 maxAioPR = (maxaiopr < 1 ? 8 : maxaiopr);
00336 maxAioPR2 = maxAioPR * 2;
00337 XrdXrootdAio::maxAio = (maxaio < maxAioPR ? maxAioPR : maxaio);
00338
00339
00340
00341 TRACE(DEBUG, "Max aio/req=" <<maxAioPR
00342 <<"; aio/srv=" <<XrdXrootdAio::maxAio
00343 <<"; Quantum=" <<Quantum);
00344
00345
00346
00347 if ((arp = addBlock())) {arp->Clear(0); arp->Recycle(0);}
00348 if ((aiop = XrdXrootdAio::addBlock())) aiop->Recycle();
00349 }
00350
00351
00352
00353
00354
00355 int XrdXrootdAioReq::Read()
00356 {
00357 int rc;
00358 XrdXrootdAio *aiop;
00359
00360
00361
00362
00363
00364 if (!(aiop = aioFree)) return -ENOBUFS;
00365 aioFree = aiop->Next;
00366 aiop->Next = 0;
00367
00368
00369
00370
00371 aiop->sfsAio.aio_offset = myOffset;
00372 aiop->sfsAio.aio_nbytes = (aiop->buffp->bsize>myIOLen ? myIOLen
00373 : aiop->buffp->bsize);
00374
00375
00376
00377
00378
00379 myIOLen -= aiop->sfsAio.aio_nbytes;
00380 myOffset += aiop->sfsAio.aio_nbytes;
00381 numActive++;
00382 if ((rc = myFile->XrdSfsp->read((XrdSfsAio *)aiop)))
00383 {numActive--; Recycle();}
00384
00385
00386
00387 return rc;
00388 }
00389
00390
00391
00392
00393
00394 void XrdXrootdAioReq::Recycle(int dref, XrdXrootdAio *oldp)
00395 {
00396 XrdXrootdAio *aiop;
00397
00398
00399
00400
00401 if (oldp) oldp->Recycle();
00402
00403
00404
00405
00406
00407 if (dref < 0)
00408 {Lock();
00409 if (numActive)
00410 {aioError = -1; respDone = 1;
00411 UnLock();
00412 return;
00413 }
00414 UnLock();
00415 }
00416
00417
00418
00419 while((aiop = aioDone)) {aioDone = aiop->Next; aiop->Recycle();}
00420 while((aiop = aioFree)) {aioFree = aiop->Next; aiop->Recycle();}
00421
00422
00423
00424 if (Link && dref && aioType != 'w') Link->setRef(-1);
00425
00426
00427
00428 if (isLocked) UnLock();
00429
00430
00431
00432 rqMutex.Lock();
00433 Next = rqFirst;
00434 rqFirst = this;
00435 rqMutex.UnLock();
00436 }
00437
00438
00439
00440
00441
00442 int XrdXrootdAioReq::Write(XrdXrootdAio *aiop)
00443 {
00444 int rc;
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456 Lock(); numActive++; UnLock();
00457 if ((rc = myFile->XrdSfsp->write((XrdSfsAio *)aiop)))
00458 {Lock(); numActive--; UnLock(); Recycle(-1);}
00459
00460
00461
00462 return rc;
00463 }
00464
00465
00466
00467
00468
00469
00470
00471
00472 XrdXrootdAioReq *XrdXrootdAioReq::addBlock()
00473 {
00474 const int numalloc = 4096/sizeof(XrdXrootdAioReq);
00475 int i = numalloc;
00476 XrdXrootdAioReq *arp;
00477
00478 if (!numalloc) return new XrdXrootdAioReq();
00479 TRACE(DEBUG, "Adding " <<numalloc <<" aioreq objects.");
00480
00481 if ((arp = new XrdXrootdAioReq[numalloc]()))
00482 while(--i) {arp->Next = rqFirst; rqFirst = arp; arp++;}
00483
00484 return arp;
00485 }
00486
00487
00488
00489
00490
00491 void XrdXrootdAioReq::Clear(XrdLink *lnkp)
00492 {
00493 Next = 0;
00494 myOffset = 0;
00495 myIOLen = 0;
00496 Instance = 0;
00497 Link = lnkp;
00498 myFile = 0;
00499 aioDone = 0;
00500 aioFree = 0;
00501 numActive = 0;
00502 aioTotal = 0;
00503 aioError = 0;
00504 aioType = 0;
00505 respDone = 0;
00506 isLocked = 0;
00507 reDrive = 0;
00508 }
00509
00510
00511
00512
00513
00514 void XrdXrootdAioReq::endRead()
00515 {
00516 XrdXrootdAio *aiop;
00517 int rc;
00518
00519
00520
00521
00522
00523
00524 Lock();
00525 numActive--;
00526
00527
00528
00529
00530 if (!(Link->isInstance(Instance))) {Scuttle("aio read"); return;}
00531
00532
00533
00534
00535 aiop = aioDone;
00536 aioDone = aiop->Next;
00537
00538
00539
00540 if (aioError
00541 || (myIOLen > 0 && aiop->Result == aiop->buffp->bsize && (aioError=Read())))
00542 {sendError((char *)aiop->TIdent);
00543 Recycle(1, aiop);
00544 return;
00545 }
00546
00547
00548
00549
00550 rc = (numActive ?
00551 Response.Send(kXR_oksofar, aiop->buffp->buff, aiop->Result) :
00552 Response.Send( aiop->buffp->buff, aiop->Result));
00553
00554
00555
00556
00557 if (!numActive)
00558 {myFile->readCnt += aioTotal;
00559 Recycle(1, aiop);
00560 }
00561 else {aiop->Next = aioFree, aioFree = aiop;
00562 if (rc < 0) {aioError = -1; respDone = 1;}
00563 UnLock();
00564 }
00565 }
00566
00567
00568
00569
00570
00571 void XrdXrootdAioReq::endWrite()
00572 {
00573
00574
00575
00576
00577
00578
00579 if (!(Link->isInstance(Instance))) {Scuttle("aio write"); return;}
00580
00581
00582
00583 if (aioError) sendError(Link->ID);
00584 else Response.Send();
00585
00586
00587
00588
00589 myFile->writeCnt += aioTotal;
00590
00591
00592
00593 Recycle();
00594 }
00595
00596
00597
00598
00599
00600 void XrdXrootdAioReq::Scuttle(const char *opname)
00601 {
00602
00603
00604
00605 eDest->Emsg("scuttle",opname,"failed; link reassigned to",Link->ID);
00606
00607
00608
00609
00610 Recycle(0);
00611 }
00612
00613
00614
00615
00616
00617
00618
00619 void XrdXrootdAioReq::sendError(char *tident)
00620 {
00621 char mbuff[4096];
00622 int rc;
00623
00624
00625
00626 if (respDone) return;
00627 respDone = 1;
00628
00629
00630
00631
00632 snprintf(mbuff, sizeof(mbuff)-1, "XrdXrootdAio: Unable to %s %s; %s",
00633 (aioType == 'r' ? "read" : "write"), myFile->XrdSfsp->FName(),
00634 eDest->ec2text(aioError));
00635
00636
00637
00638 eDest->Emsg("aio", tident, mbuff);
00639
00640
00641
00642 rc = XrdXrootdProtocol::mapError(aioError);
00643
00644
00645
00646 Response.Send((XErrorCode)rc, mbuff);
00647 }