00001 /******************************************************************************/ 00002 /* */ 00003 /* X r d X r o o t d X e q A i o . c c */ 00004 /* */ 00005 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */ 00006 /* All Rights Reserved */ 00007 /* Produced by Andrew Hanushevsky for Stanford University under contract */ 00008 /* DE-AC03-76-SFO0515 with the Department of Energy */ 00009 /******************************************************************************/ 00010 00011 // $Id: XrdXrootdXeqAio.cc 35287 2010-09-14 21:19:35Z ganis $ 00012 00013 const char *XrdXrootdAiocbCVSID = "$Id: XrdXrootdXeqAio.cc 35287 2010-09-14 21:19:35Z ganis $"; 00014 00015 #include <unistd.h> 00016 00017 #include "Xrd/XrdBuffer.hh" 00018 #include "Xrd/XrdLink.hh" 00019 #include "XrdSys/XrdSysError.hh" 00020 #include "XrdOuc/XrdOucErrInfo.hh" 00021 #include "XrdSfs/XrdSfsInterface.hh" 00022 #include "XrdXrootd/XrdXrootdAio.hh" 00023 #include "XrdXrootd/XrdXrootdFile.hh" 00024 #include "XrdXrootd/XrdXrootdProtocol.hh" 00025 #include "XrdXrootd/XrdXrootdTrace.hh" 00026 00027 /******************************************************************************/ 00028 /* G l o b a l s */ 00029 /******************************************************************************/ 00030 00031 extern XrdOucTrace *XrdXrootdTrace; 00032 00033 /******************************************************************************/ 00034 /* a i o _ E r r o r */ 00035 /******************************************************************************/ 00036 00037 int XrdXrootdProtocol::aio_Error(const char *op, int ecode) 00038 { 00039 char *etext, buffer[MAXPATHLEN+80], unkbuff[64]; 00040 00041 // Get the reason for the error 00042 // 00043 if (!(etext = eDest.ec2text(ecode))) 00044 {sprintf(unkbuff, "reason unknown (%d)", ecode); etext = unkbuff;} 00045 00046 // Format the error message 00047 // 00048 snprintf(buffer,sizeof(buffer),"Unable to %s %s; %s", 00049 op, myFile->XrdSfsp->FName(), etext); 00050 00051 // Print it out if debugging is enabled 00052 // 00053 #ifndef NODEBUG 00054 eDest.Emsg("aio_Error", Link->ID, buffer); 00055 #endif 00056 00057 // Place the error message in the error object and return 00058 // 00059 myFile->XrdSfsp->error.setErrInfo(ecode, buffer); 00060 00061 // Prepare for recovery 00062 // 00063 myAioReq = 0; 00064 return -EIO; 00065 } 00066 00067 /******************************************************************************/ 00068 /* a i o _ R e a d */ 00069 /******************************************************************************/ 00070 00071 // Implied Arguments: 00072 00073 // myFile = file to be read 00074 // myOffset = Offset at which to read 00075 // myIOLen = Number of bytes to read from file and write to socket 00076 00077 // Returns: 00078 // >0 -> n/a 00079 // =0 -> OK to continue with next operation. 00080 // -EAGAIN -> Revert to synchronous I/O 00081 // <0 -> Error, close link. 00082 00083 int XrdXrootdProtocol::aio_Read() 00084 { 00085 XrdXrootdAioReq *arp; 00086 00087 // Allocate a request object to handle this request and fire off the first 00088 // i/o (they are self-sustaining after that). Any errors at this point will 00089 // force us to revert to synchronous i/o. 00090 // 00091 if (!(arp=XrdXrootdAioReq::Alloc(this,'r',2)) || arp->Read()) return -EAGAIN; 00092 00093 // All done 00094 // 00095 return 0; 00096 } 00097 00098 /******************************************************************************/ 00099 /* a i o _ W r i t e */ 00100 /******************************************************************************/ 00101 00102 // Implied Arguments: 00103 00104 // myFile = file to be read 00105 // myOffset = Offset at which to read 00106 // myIOLen = Number of bytes to read from file and write to socket 00107 // myStalls = Number of stalls encountered last time we did I/O 00108 00109 // Returns: 00110 // >0 -> Slow link, enable link and wait for more data. 00111 // =0 -> OK to continue with next operation. 00112 // -EAGAIN -> Revert to synchronous I/O 00113 // -EINPROGRESS -> Ran out of aio objects, leave link disabled 00114 // -EIO -> File system error, flush link. 00115 // <0 -> Error, close link. 00116 00117 int XrdXrootdProtocol::aio_Write() 00118 { 00119 00120 // Allocate a request object to handle this request 00121 // 00122 if (!(myAioReq = XrdXrootdAioReq::Alloc(this, 'w'))) return -EAGAIN; 00123 00124 // Since the socket is synchronous in delivering data to write; only one 00125 // write async request can occur at one time, though several may be in-flight 00126 // after we drain the socket of data. While draining, we remember the AioReq 00127 // object in case we must suspend operations and start the flow. 00128 // 00129 return aio_WriteAll(); 00130 } 00131 00132 /******************************************************************************/ 00133 /* a i o _ W r i t e A l l */ 00134 /******************************************************************************/ 00135 00136 // myFile = file to be read 00137 // myOffset = Offset at which to read 00138 // myIOLen = Number of bytes to read from file and write to socket 00139 // myAioReq = -> Aio Request 00140 00141 // The steps taken are: 00142 // 1) Obtain an aio object. If none available, a redrive will be scheduled for 00143 // the protocol and we return -EINPROGRESS which will keep the link disabled. 00144 00145 // 2) Read the data from the link into the buffer using getData(). 00146 00147 // 3) If the link is slow, return a 1 which will re-enable the link and 00148 // redrive the protocol when data is available. We will resume in 00149 // aio_WriteCont() when the buffer has the required amount of data. 00150 00151 // 4) If the read from the link indicated an error then abort the operation 00152 // by recycling the AioReq object which will synchronize in-flight i/o. 00153 00154 // 5) Schedule the aio write. Errors will scuttle the operation and proceed to 00155 // flush the socket. The write() call will appropriately recycle the AioReq 00156 // object. We note that no error should be returned if aio resources are 00157 // exhausted, the underlying implementation must revert to synchronous 00158 // handling. That's a lot of overhead but we'll back off. 00159 00160 int XrdXrootdProtocol::aio_WriteAll() 00161 { 00162 XrdXrootdAio *aiop; 00163 size_t Quantum; 00164 int rc = 0; 00165 00166 if (myStalls) myStalls--; 00167 00168 while (myIOLen > 0) 00169 /*1*/ {if (!(aiop = myAioReq->getAio())) 00170 {Resume = &XrdXrootdProtocol::aio_WriteAll; 00171 myBlen = 0; 00172 return -EINPROGRESS; 00173 } 00174 00175 /*2*/ Quantum = (aiop->buffp->bsize > myIOLen ? myIOLen 00176 : aiop->buffp->bsize); 00177 if ((rc = getData("aiodata", aiop->buffp->buff, Quantum))) 00178 /*3*/ {if (rc > 0) 00179 {Resume = &XrdXrootdProtocol::aio_WriteCont; 00180 myBlast = Quantum; 00181 myAioReq->Push(aiop); 00182 myStalls++; 00183 return 1; 00184 } 00185 /*4*/ myAioReq->Recycle(-1, aiop); 00186 break; 00187 } 00188 /*5*/ aiop->sfsAio.aio_nbytes = Quantum; 00189 aiop->sfsAio.aio_offset = myOffset; 00190 myIOLen -= Quantum; myOffset += Quantum; 00191 if ((rc = myAioReq->Write(aiop))) return aio_Error("write", rc); 00192 } 00193 00194 // We have completed 00195 // 00196 if (myStalls <= as_maxstalls) myStalls = 0; 00197 myAioReq = 0; 00198 Resume = 0; 00199 return rc; 00200 } 00201 00202 /******************************************************************************/ 00203 /* a i o _ W r i t e C o n t */ 00204 /******************************************************************************/ 00205 00206 // myFile = file to be written 00207 // myOffset = Offset at which to write 00208 // myIOLen = Number of bytes to read from socket and write to file 00209 // myBlast = Number of bytes already read from the socket 00210 // myAio = Pointer to the XrdXrootdAioReq object. 00211 00212 int XrdXrootdProtocol::aio_WriteCont() 00213 { 00214 XrdXrootdAio *aiop = myAioReq->Pop(); 00215 int rc; 00216 00217 // Write data that was finaly finished comming in. Note that we could simply 00218 // pick up the current aio object without locks since this is synchronized 00219 // via protocol object scheduling (only one can occur at a time). 00220 // 00221 if ((rc = myAioReq->Write(aiop))) 00222 {myIOLen = myIOLen-myBlast; 00223 return aio_Error("write", rc); 00224 } 00225 myOffset += myBlast; myIOLen -= myBlast; 00226 00227 // Either continue the request or return to enable the link 00228 // 00229 if (myIOLen > 0) return aio_WriteAll(); 00230 myAioReq = 0; 00231 return 0; 00232 }