XrdXrootdXeqAio.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                    X r d X r o o t d X e q A i o . c c                     */
00004 /*                                                                            */
00005 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC03-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //        $Id: XrdXrootdXeqAio.cc 35287 2010-09-14 21:19:35Z ganis $
00012 
00013 const char *XrdXrootdAiocbCVSID = "$Id: XrdXrootdXeqAio.cc 35287 2010-09-14 21:19:35Z ganis $";
00014 
00015 #include <unistd.h>
00016 
00017 #include "Xrd/XrdBuffer.hh"
00018 #include "Xrd/XrdLink.hh"
00019 #include "XrdSys/XrdSysError.hh"
00020 #include "XrdOuc/XrdOucErrInfo.hh"
00021 #include "XrdSfs/XrdSfsInterface.hh"
00022 #include "XrdXrootd/XrdXrootdAio.hh"
00023 #include "XrdXrootd/XrdXrootdFile.hh"
00024 #include "XrdXrootd/XrdXrootdProtocol.hh"
00025 #include "XrdXrootd/XrdXrootdTrace.hh"
00026   
00027 /******************************************************************************/
00028 /*                               G l o b a l s                                */
00029 /******************************************************************************/
00030 
00031 extern  XrdOucTrace  *XrdXrootdTrace;
00032   
00033 /******************************************************************************/
00034 /*                             a i o _ E r r o r                              */
00035 /******************************************************************************/
00036 
00037 int XrdXrootdProtocol::aio_Error(const char *op, int ecode)
00038 {
00039    char *etext, buffer[MAXPATHLEN+80], unkbuff[64];
00040 
00041 // Get the reason for the error
00042 //
00043    if (!(etext = eDest.ec2text(ecode)))
00044       {sprintf(unkbuff, "reason unknown (%d)", ecode); etext = unkbuff;}
00045 
00046 // Format the error message
00047 //
00048     snprintf(buffer,sizeof(buffer),"Unable to %s %s; %s",
00049                     op, myFile->XrdSfsp->FName(), etext);
00050 
00051 // Print it out if debugging is enabled
00052 //
00053 #ifndef NODEBUG
00054     eDest.Emsg("aio_Error", Link->ID, buffer);
00055 #endif
00056 
00057 // Place the error message in the error object and return
00058 //
00059     myFile->XrdSfsp->error.setErrInfo(ecode, buffer);
00060 
00061 // Prepare for recovery
00062 //
00063    myAioReq = 0;
00064    return -EIO;
00065 }
00066   
00067 /******************************************************************************/
00068 /*                              a i o _ R e a d                               */
00069 /******************************************************************************/
00070   
00071 // Implied Arguments:
00072 
00073 // myFile   = file to be read
00074 // myOffset = Offset at which to read
00075 // myIOLen  = Number of bytes to read from file and write to socket
00076 
00077 // Returns:
00078 // >0      -> n/a
00079 // =0      -> OK to continue with next operation.
00080 // -EAGAIN -> Revert to synchronous I/O
00081 // <0      -> Error, close link.
00082   
00083 int XrdXrootdProtocol::aio_Read()
00084 {
00085    XrdXrootdAioReq *arp;
00086 
00087 // Allocate a request object to handle this request and fire off the first
00088 // i/o (they are self-sustaining after that). Any errors at this point will
00089 // force us to revert to synchronous i/o.
00090 //
00091    if (!(arp=XrdXrootdAioReq::Alloc(this,'r',2)) || arp->Read()) return -EAGAIN;
00092 
00093 // All done
00094 //
00095    return 0;
00096 }
00097 
00098 /******************************************************************************/
00099 /*                             a i o _ W r i t e                              */
00100 /******************************************************************************/
00101   
00102 // Implied Arguments:
00103 
00104 // myFile   = file to be read
00105 // myOffset = Offset at which to read
00106 // myIOLen  = Number of bytes to read from file and write to socket
00107 // myStalls = Number of stalls encountered last time we did I/O
00108 
00109 // Returns:
00110 // >0           -> Slow link, enable link and wait for more data.
00111 // =0           -> OK to continue with next operation.
00112 // -EAGAIN      -> Revert to synchronous I/O
00113 // -EINPROGRESS -> Ran out of aio objects, leave link disabled
00114 // -EIO         -> File system error, flush link.
00115 // <0           -> Error, close link.
00116   
00117 int XrdXrootdProtocol::aio_Write()
00118 {
00119 
00120 // Allocate a request object to handle this request
00121 //
00122    if (!(myAioReq = XrdXrootdAioReq::Alloc(this, 'w'))) return -EAGAIN;
00123 
00124 // Since the socket is synchronous in delivering data to write; only one
00125 // write async request can occur at one time, though several may be in-flight
00126 // after we drain the socket of data. While draining, we remember the AioReq
00127 // object in case we must suspend operations and start the flow.
00128 //
00129    return aio_WriteAll();
00130 }
00131 
00132 /******************************************************************************/
00133 /*                          a i o _ W r i t e A l l                           */
00134 /******************************************************************************/
00135   
00136 // myFile   = file to be read
00137 // myOffset = Offset at which to read
00138 // myIOLen  = Number of bytes to read from file and write to socket
00139 // myAioReq = -> Aio Request
00140 
00141 // The steps taken are:
00142 // 1) Obtain an aio object. If none available, a redrive will be scheduled for
00143 //    the protocol and we return -EINPROGRESS which will keep the link disabled. 
00144 
00145 // 2) Read the data from the link into the buffer using getData().
00146 
00147 // 3) If the link is slow, return a 1 which will re-enable the link and
00148 //    redrive the protocol when data is available. We will resume in 
00149 //    aio_WriteCont() when the buffer has the required amount of data.
00150 
00151 // 4) If the read from the link indicated an error then abort the operation
00152 //    by recycling the AioReq object which will synchronize in-flight i/o.
00153 
00154 // 5) Schedule the aio write. Errors will scuttle the operation and proceed to
00155 //    flush the socket. The write() call will appropriately recycle the AioReq
00156 //    object. We note that no error should be returned if aio resources are 
00157 //    exhausted, the underlying implementation must revert to synchronous 
00158 //    handling. That's a lot of overhead but we'll back off.
00159 
00160 int XrdXrootdProtocol::aio_WriteAll()
00161 {
00162    XrdXrootdAio *aiop;
00163    size_t Quantum;
00164    int rc = 0;
00165 
00166    if (myStalls) myStalls--;
00167 
00168    while (myIOLen > 0)
00169 /*1*/    {if (!(aiop = myAioReq->getAio()))
00170              {Resume = &XrdXrootdProtocol::aio_WriteAll;
00171               myBlen = 0;
00172               return -EINPROGRESS;
00173              }
00174 
00175 /*2*/     Quantum = (aiop->buffp->bsize > myIOLen ? myIOLen
00176                                                   : aiop->buffp->bsize);
00177           if ((rc = getData("aiodata", aiop->buffp->buff, Quantum)))
00178 /*3*/       {if (rc > 0)
00179                 {Resume = &XrdXrootdProtocol::aio_WriteCont;
00180                  myBlast = Quantum;
00181                  myAioReq->Push(aiop);
00182                  myStalls++;
00183                  return 1;
00184                 }
00185 /*4*/        myAioReq->Recycle(-1, aiop);
00186              break;
00187             }
00188 /*5*/    aiop->sfsAio.aio_nbytes = Quantum;
00189          aiop->sfsAio.aio_offset = myOffset;
00190          myIOLen  -= Quantum; myOffset += Quantum;
00191          if ((rc = myAioReq->Write(aiop))) return aio_Error("write", rc);
00192          }
00193 
00194 // We have completed
00195 //
00196    if (myStalls <= as_maxstalls) myStalls = 0;
00197    myAioReq = 0;
00198    Resume   = 0;
00199    return rc;
00200 }
00201 
00202 /******************************************************************************/
00203 /*                         a i o _ W r i t e C o n t                          */
00204 /******************************************************************************/
00205 
00206 // myFile   = file to be written
00207 // myOffset = Offset at which to write
00208 // myIOLen  = Number of bytes to read from socket and write to file
00209 // myBlast  = Number of bytes already read from the socket
00210 // myAio    = Pointer to the XrdXrootdAioReq object.
00211   
00212 int XrdXrootdProtocol::aio_WriteCont()
00213 {
00214    XrdXrootdAio *aiop = myAioReq->Pop();
00215    int rc;
00216 
00217 // Write data that was finaly finished comming in. Note that we could simply
00218 // pick up the current aio object without locks since this is synchronized
00219 // via protocol object scheduling (only one can occur at a time).
00220 //
00221    if ((rc = myAioReq->Write(aiop)))
00222       {myIOLen  = myIOLen-myBlast;
00223        return aio_Error("write", rc);
00224       }
00225     myOffset += myBlast; myIOLen -= myBlast;
00226 
00227 // Either continue the request or return to enable the link
00228 //
00229    if (myIOLen > 0) return aio_WriteAll();
00230    myAioReq = 0;
00231    return 0;
00232 }

Generated on Tue Jul 5 14:47:04 2011 for ROOT_528-00b_version by  doxygen 1.5.1