XrdClientReadV.cc

Go to the documentation of this file.
00001 //////////////////////////////////////////////////////////////////////////
00002 //                                                                      //
00003 // XrdClientReadV                                                       //
00004 //                                                                      //
00005 // Author: Fabrizio Furano (INFN Padova, 2006)                          //
00006 //                                                                      //
00007 // Helper functions for the vectored read functionality                 //
00008 //                                                                      //
00009 //////////////////////////////////////////////////////////////////////////
00010 
00011 //         $Id: XrdClientReadV.cc 32231 2010-02-05 18:24:46Z ganis $
00012 
00013 const char *XrdClientReadVCVSID = "$Id: XrdClientReadV.cc 32231 2010-02-05 18:24:46Z ganis $";
00014 
00015 #include "XrdClient/XrdClientReadV.hh"
00016 #include "XrdClient/XrdClientConn.hh"
00017 #include "XrdClient/XrdClientDebug.hh"
00018 #include <memory.h>
00019 
00020 // Builds a request and sends it to the server
00021 // If destbuf == 0 the request is sent asynchronously
00022 // nbuf returns the number of processed buffers
00023 kXR_int64 XrdClientReadV::ReqReadV(XrdClientConn *xrdc, char *handle, char *destbuf,
00024                                    XrdClientVector<XrdClientReadVinfo> &reqvect,
00025                                    int firstreq, int nreq, int streamtosend) {
00026 
00027     readahead_list buflis[READV_MAXCHUNKS];
00028 
00029     Info(XrdClientDebug::kUSERDEBUG, "ReqReadV",
00030          "Requesting to read " << nreq <<
00031          " chunks.");
00032 
00033     kXR_int64 total_len = 0;
00034 
00035     // Now we build the protocol-ready read ahead list
00036     //  and also put the correct placeholders inside the cache
00037     for (int i = 0; i < nreq; i++) {
00038 
00039             memcpy( &(buflis[i].fhandle), handle, 4 ); 
00040 
00041 
00042             if (!destbuf)
00043                 xrdc->SubmitPlaceholderToCache(reqvect[firstreq+i].offset,
00044                                                reqvect[firstreq+i].offset +
00045                                                reqvect[firstreq+i].len-1);
00046 
00047             buflis[i].offset = reqvect[firstreq+i].offset;
00048             buflis[i].rlen = reqvect[firstreq+i].len;
00049             total_len += buflis[i].rlen;
00050     }
00051 
00052     if (nreq > 0) {
00053 
00054         // Prepare a request header 
00055         ClientRequest readvFileRequest;
00056         memset( &readvFileRequest, 0, sizeof(readvFileRequest) );
00057         xrdc->SetSID(readvFileRequest.header.streamid);
00058         readvFileRequest.header.requestid = kXR_readv;
00059         readvFileRequest.readv.dlen = nreq * sizeof(struct readahead_list);
00060 
00061         if (destbuf) {
00062             // A buffer able to hold the data and the info about the chunks
00063             char *res_buf = new char[total_len + (nreq * sizeof(struct readahead_list))];
00064 
00065             clientMarshallReadAheadList(buflis, readvFileRequest.readv.dlen);
00066             bool r = xrdc->SendGenCommand(&readvFileRequest, buflis, 0, 
00067                                           (void *)res_buf, FALSE, (char *)"ReadV");
00068             clientUnMarshallReadAheadList(buflis, readvFileRequest.readv.dlen);
00069 
00070             if ( r ) {
00071         
00072                 total_len = UnpackReadVResp(destbuf, res_buf,
00073                                             xrdc->LastServerResp.dlen,
00074                                             buflis,
00075                                             nreq);
00076             }
00077             else
00078               total_len = -1;
00079         
00080             delete [] res_buf;
00081         }
00082         else {
00083           clientMarshallReadAheadList(buflis, readvFileRequest.readv.dlen);
00084           if (xrdc->WriteToServer_Async(&readvFileRequest,
00085                                         buflis) != kOK )
00086             total_len = 0;
00087 
00088         }
00089 
00090     }
00091 
00092     Info(XrdClientDebug::kHIDEBUG, "ReqReadV",
00093          "Returning: total_len " << total_len);
00094     return total_len;
00095 }
00096 
00097 
00098 // Picks a readv response and puts the individual chunks into the dest buffer
00099 kXR_int32 XrdClientReadV::UnpackReadVResp(char *destbuf, char *respdata, kXR_int32 respdatalen,
00100                                     readahead_list *buflis, int nbuf) {
00101 
00102     int res = respdatalen;
00103 
00104     // I just rebuild the readahead_list element
00105     struct readahead_list header;
00106     kXR_int32 pos_from = 0, pos_to = 0;
00107     int i = 0;
00108     kXR_int64 cur_buf_offset = -1;
00109     int cur_buf_len = 0, cur_buf = 0;
00110     
00111     while ( (pos_from < respdatalen) && (i < nbuf) ) {
00112         memcpy(&header, respdata + pos_from, sizeof(struct readahead_list));
00113        
00114         kXR_int64 tmpl;
00115         memcpy(&tmpl, &header.offset, sizeof(kXR_int64) );
00116         tmpl = ntohll(tmpl);
00117         memcpy(&header.offset, &tmpl, sizeof(kXR_int64) );
00118 
00119         header.rlen  = ntohl(header.rlen);       
00120 
00121         // Do some consistency checks
00122         if (cur_buf_len == 0) {
00123            cur_buf_offset = header.offset;
00124            if (cur_buf_offset != buflis[cur_buf].offset) {
00125               res = -1;  
00126               break;
00127            }
00128            cur_buf_len += header.rlen;
00129            if (cur_buf_len == buflis[cur_buf].rlen) {
00130               cur_buf++;
00131               cur_buf_len = 0;
00132            }
00133         }
00134 
00135         pos_from += sizeof(struct readahead_list);
00136         memcpy( &destbuf[pos_to], &respdata[pos_from], header.rlen);
00137         pos_from += header.rlen;
00138         pos_to += header.rlen;
00139         i++;
00140     }
00141 
00142     if (pos_from != respdatalen || i != nbuf)
00143        Error("UnpackReadVResp","Inconsistency: pos_from " << pos_from <<
00144              " respdatalen " << respdatalen <<
00145              " i " << i <<
00146              " nbuf " << nbuf );
00147 
00148     if (res > 0)
00149       res = pos_to;
00150 
00151     return res;
00152 }
00153 
00154 // Picks a readv response and puts the individual chunks into the cache
00155 int XrdClientReadV::SubmitToCacheReadVResp(XrdClientConn *xrdc, char *respdata,
00156                                            kXR_int32 respdatalen) {
00157 
00158     // This probably means that the server doesnt support ReadV
00159     // ( old version of the server )
00160     int res = -1;
00161 
00162 
00163         res = respdatalen;
00164 
00165         // I just rebuild the readahead_list element
00166 
00167         struct readahead_list header;
00168         kXR_int32 pos_from = 0;
00169         kXR_int32 rlen = 0;
00170         kXR_int64 offs=0;
00171 
00172 //      // Just to log the entries
00173 //      while ( pos_from < respdatalen ) {
00174 //          header = ( readahead_list * )(respdata + pos_from);
00175 
00176 //          memcpy(&offs, &header->offset, sizeof(kXR_int64) );
00177 //          offs = ntohll(offs);
00178 //          rlen = ntohl(header->rlen);   
00179 
00180 //          pos_from += sizeof(struct readahead_list);
00181 
00182 //          Info(XrdClientDebug::kHIDEBUG, "ReadV",
00183 //               "Received chunk " << rlen << " @ " << offs );
00184 
00185 //          pos_from += rlen;
00186 //      }
00187 
00188         pos_from = 0;
00189 
00190 
00191         while ( pos_from < respdatalen ) {
00192             memcpy(&header, respdata + pos_from, sizeof(struct readahead_list));
00193 
00194             offs = ntohll(header.offset);
00195             rlen = ntohl(header.rlen);      
00196 
00197             pos_from += sizeof(struct readahead_list);
00198 
00199             // NOTE: we must duplicate the buffer to be submitted, since a cache block has to be
00200             // contained in one single memblock, while here we have one for multiple chunks.
00201             void *newbuf = malloc(rlen);
00202             memcpy(newbuf, &respdata[pos_from], rlen);
00203 
00204             xrdc->SubmitRawDataToCache(newbuf, offs, offs + rlen - 1);
00205 
00206             pos_from += rlen;
00207 
00208         }
00209         res = pos_from;
00210 
00211         free( respdata );
00212 
00213     return res;
00214 
00215 
00216 
00217 }
00218 
00219 
00220 
00221 void XrdClientReadV::PreProcessChunkRequest(XrdClientVector<XrdClientReadVinfo> &reqvect,
00222                                             kXR_int64 offs, kXR_int32 len,
00223                                             kXR_int64 filelen,
00224                                             kXR_int32 spltsize) {
00225   // Process a single subchunk request, eventually splitting it into more than one
00226 
00227   kXR_int32 len_ok = 0;
00228   kXR_int32 newlen = xrdmin(filelen - offs, len);
00229   
00230   // We want blocks whose len does not exceed READV_MAXCHUNKSIZE
00231   spltsize = xrdmin(spltsize, READV_MAXCHUNKSIZE);
00232 
00233   while (len_ok < newlen) {
00234     XrdClientReadVinfo nfo;
00235 
00236     nfo.offset = offs+len_ok;
00237     nfo.len = xrdmin(newlen-len_ok, spltsize);
00238 
00239     reqvect.Push_back(nfo);
00240 
00241     len_ok += nfo.len;
00242   }
00243   
00244 }
00245 
00246 

Generated on Tue Jul 5 14:46:18 2011 for ROOT_528-00b_version by  doxygen 1.5.1