00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdClientReadVCVSID = "$Id: XrdClientReadV.cc 32231 2010-02-05 18:24:46Z ganis $";
00014
00015 #include "XrdClient/XrdClientReadV.hh"
00016 #include "XrdClient/XrdClientConn.hh"
00017 #include "XrdClient/XrdClientDebug.hh"
00018 #include <memory.h>
00019
00020
00021
00022
00023 kXR_int64 XrdClientReadV::ReqReadV(XrdClientConn *xrdc, char *handle, char *destbuf,
00024 XrdClientVector<XrdClientReadVinfo> &reqvect,
00025 int firstreq, int nreq, int streamtosend) {
00026
00027 readahead_list buflis[READV_MAXCHUNKS];
00028
00029 Info(XrdClientDebug::kUSERDEBUG, "ReqReadV",
00030 "Requesting to read " << nreq <<
00031 " chunks.");
00032
00033 kXR_int64 total_len = 0;
00034
00035
00036
00037 for (int i = 0; i < nreq; i++) {
00038
00039 memcpy( &(buflis[i].fhandle), handle, 4 );
00040
00041
00042 if (!destbuf)
00043 xrdc->SubmitPlaceholderToCache(reqvect[firstreq+i].offset,
00044 reqvect[firstreq+i].offset +
00045 reqvect[firstreq+i].len-1);
00046
00047 buflis[i].offset = reqvect[firstreq+i].offset;
00048 buflis[i].rlen = reqvect[firstreq+i].len;
00049 total_len += buflis[i].rlen;
00050 }
00051
00052 if (nreq > 0) {
00053
00054
00055 ClientRequest readvFileRequest;
00056 memset( &readvFileRequest, 0, sizeof(readvFileRequest) );
00057 xrdc->SetSID(readvFileRequest.header.streamid);
00058 readvFileRequest.header.requestid = kXR_readv;
00059 readvFileRequest.readv.dlen = nreq * sizeof(struct readahead_list);
00060
00061 if (destbuf) {
00062
00063 char *res_buf = new char[total_len + (nreq * sizeof(struct readahead_list))];
00064
00065 clientMarshallReadAheadList(buflis, readvFileRequest.readv.dlen);
00066 bool r = xrdc->SendGenCommand(&readvFileRequest, buflis, 0,
00067 (void *)res_buf, FALSE, (char *)"ReadV");
00068 clientUnMarshallReadAheadList(buflis, readvFileRequest.readv.dlen);
00069
00070 if ( r ) {
00071
00072 total_len = UnpackReadVResp(destbuf, res_buf,
00073 xrdc->LastServerResp.dlen,
00074 buflis,
00075 nreq);
00076 }
00077 else
00078 total_len = -1;
00079
00080 delete [] res_buf;
00081 }
00082 else {
00083 clientMarshallReadAheadList(buflis, readvFileRequest.readv.dlen);
00084 if (xrdc->WriteToServer_Async(&readvFileRequest,
00085 buflis) != kOK )
00086 total_len = 0;
00087
00088 }
00089
00090 }
00091
00092 Info(XrdClientDebug::kHIDEBUG, "ReqReadV",
00093 "Returning: total_len " << total_len);
00094 return total_len;
00095 }
00096
00097
00098
00099 kXR_int32 XrdClientReadV::UnpackReadVResp(char *destbuf, char *respdata, kXR_int32 respdatalen,
00100 readahead_list *buflis, int nbuf) {
00101
00102 int res = respdatalen;
00103
00104
00105 struct readahead_list header;
00106 kXR_int32 pos_from = 0, pos_to = 0;
00107 int i = 0;
00108 kXR_int64 cur_buf_offset = -1;
00109 int cur_buf_len = 0, cur_buf = 0;
00110
00111 while ( (pos_from < respdatalen) && (i < nbuf) ) {
00112 memcpy(&header, respdata + pos_from, sizeof(struct readahead_list));
00113
00114 kXR_int64 tmpl;
00115 memcpy(&tmpl, &header.offset, sizeof(kXR_int64) );
00116 tmpl = ntohll(tmpl);
00117 memcpy(&header.offset, &tmpl, sizeof(kXR_int64) );
00118
00119 header.rlen = ntohl(header.rlen);
00120
00121
00122 if (cur_buf_len == 0) {
00123 cur_buf_offset = header.offset;
00124 if (cur_buf_offset != buflis[cur_buf].offset) {
00125 res = -1;
00126 break;
00127 }
00128 cur_buf_len += header.rlen;
00129 if (cur_buf_len == buflis[cur_buf].rlen) {
00130 cur_buf++;
00131 cur_buf_len = 0;
00132 }
00133 }
00134
00135 pos_from += sizeof(struct readahead_list);
00136 memcpy( &destbuf[pos_to], &respdata[pos_from], header.rlen);
00137 pos_from += header.rlen;
00138 pos_to += header.rlen;
00139 i++;
00140 }
00141
00142 if (pos_from != respdatalen || i != nbuf)
00143 Error("UnpackReadVResp","Inconsistency: pos_from " << pos_from <<
00144 " respdatalen " << respdatalen <<
00145 " i " << i <<
00146 " nbuf " << nbuf );
00147
00148 if (res > 0)
00149 res = pos_to;
00150
00151 return res;
00152 }
00153
00154
00155 int XrdClientReadV::SubmitToCacheReadVResp(XrdClientConn *xrdc, char *respdata,
00156 kXR_int32 respdatalen) {
00157
00158
00159
00160 int res = -1;
00161
00162
00163 res = respdatalen;
00164
00165
00166
00167 struct readahead_list header;
00168 kXR_int32 pos_from = 0;
00169 kXR_int32 rlen = 0;
00170 kXR_int64 offs=0;
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188 pos_from = 0;
00189
00190
00191 while ( pos_from < respdatalen ) {
00192 memcpy(&header, respdata + pos_from, sizeof(struct readahead_list));
00193
00194 offs = ntohll(header.offset);
00195 rlen = ntohl(header.rlen);
00196
00197 pos_from += sizeof(struct readahead_list);
00198
00199
00200
00201 void *newbuf = malloc(rlen);
00202 memcpy(newbuf, &respdata[pos_from], rlen);
00203
00204 xrdc->SubmitRawDataToCache(newbuf, offs, offs + rlen - 1);
00205
00206 pos_from += rlen;
00207
00208 }
00209 res = pos_from;
00210
00211 free( respdata );
00212
00213 return res;
00214
00215
00216
00217 }
00218
00219
00220
00221 void XrdClientReadV::PreProcessChunkRequest(XrdClientVector<XrdClientReadVinfo> &reqvect,
00222 kXR_int64 offs, kXR_int32 len,
00223 kXR_int64 filelen,
00224 kXR_int32 spltsize) {
00225
00226
00227 kXR_int32 len_ok = 0;
00228 kXR_int32 newlen = xrdmin(filelen - offs, len);
00229
00230
00231 spltsize = xrdmin(spltsize, READV_MAXCHUNKSIZE);
00232
00233 while (len_ok < newlen) {
00234 XrdClientReadVinfo nfo;
00235
00236 nfo.offset = offs+len_ok;
00237 nfo.len = xrdmin(newlen-len_ok, spltsize);
00238
00239 reqvect.Push_back(nfo);
00240
00241 len_ok += nfo.len;
00242 }
00243
00244 }
00245
00246