00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 const char *XrdClientMStreamCVSID = "$Id: XrdClientMStream.cc 35060 2010-08-29 08:39:10Z brun $";
00017
00018
00019 #include "XrdClient/XrdClientMStream.hh"
00020 #include "XrdClient/XrdClientLogConnection.hh"
00021 #include "XrdClient/XrdClientEnv.hh"
00022 #include "XrdClient/XrdClientDebug.hh"
00023
00024
00025
00026 #define XRDCLI_PSOCKTEMP -1000
00027
00028 struct ParStreamOpenerArgs {
00029 XrdClientThread *thr;
00030 XrdClientConn *cliconn;
00031 int wan_port, wan_window;
00032 int tmpid;
00033 };
00034
00035
00036 void *ParStreamOpenerThread(void *arg, XrdClientThread *thr)
00037 {
00038
00039
00040
00041 if (thr->MaskSignal(0) != 0)
00042 Error("ParStreamOpenerThread", "Warning: problems masking signals");
00043
00044 ParStreamOpenerArgs *parms = (ParStreamOpenerArgs *)arg;
00045
00046 XrdClientMStream::AddParallelStream(parms->cliconn, parms->wan_port, parms->wan_window, parms->tmpid);
00047
00048 return 0;
00049 }
00050
00051
00052 int XrdClientMStream::EstablishParallelStreams(XrdClientConn *cliconn) {
00053 int mx = EnvGetLong(NAME_MULTISTREAMCNT);
00054 int i, res;
00055 int wan_port = 0, wan_window = 0;
00056
00057 if (mx <= 1) return 1;
00058 if (cliconn->GetServerType() == kSTBaseXrootd) return 1;
00059
00060
00061 XrdClientPhyConnection *phyconn = XrdClientConn::GetPhyConn(cliconn->GetLogConnID());
00062 if (!phyconn) return 0;
00063
00064
00065
00066 if (phyconn->TestAndSetMStreamsGoing()) return 1;
00067
00068
00069 char *qryitems = (char *)"wan_port wan_window";
00070 ClientRequest qryRequest;
00071 char qryResp[1024];
00072 memset( &qryRequest, 0, sizeof(qryRequest) );
00073 memset( qryResp, 0, 1024 );
00074
00075 cliconn->SetSID(qryRequest.header.streamid);
00076 qryRequest.header.requestid = kXR_query;
00077 qryRequest.query.infotype = kXR_Qconfig;
00078 qryRequest.header.dlen = strlen(qryitems);
00079
00080 res = cliconn->SendGenCommand(&qryRequest, qryitems, 0, qryResp,
00081 false, (char *)"QueryConfig");
00082
00083 if (res && (cliconn->LastServerResp.status == kXR_ok) &&
00084 cliconn->LastServerResp.dlen) {
00085
00086 sscanf(qryResp, "%d\n%d",
00087 &wan_port,
00088 &wan_window);
00089
00090 Info(XrdClientDebug::kUSERDEBUG,
00091 "XrdClientMStream::EstablishParallelStreams", "Server WAN parameters: port=" << wan_port << " windowsize=" << wan_window );
00092 }
00093
00094
00095
00096
00097
00098 ParStreamOpenerArgs paropeners[16];
00099 for (i = 0; i < mx; i++) {
00100 paropeners[i].thr = 0;
00101 paropeners[i].cliconn = cliconn;
00102 paropeners[i].wan_port = wan_port;
00103 paropeners[i].wan_window = wan_window;
00104 paropeners[i].tmpid = 0;
00105 }
00106
00107 for (i = 0; i < mx; i++) {
00108 Info(XrdClientDebug::kHIDEBUG,
00109 "XrdClientMStream::EstablishParallelStreams", "Trying to establish " << i+1 << "th substream." );
00110
00111 paropeners[i].thr = new XrdClientThread(ParStreamOpenerThread);
00112 if (paropeners[i].thr) {
00113 paropeners[i].tmpid = XRDCLI_PSOCKTEMP - i;
00114 if (paropeners[i].thr->Run(&paropeners[i])) {
00115 Error("XrdClientMStream::EstablishParallelStreams", "Error establishing " << i+1 << "th substream. Thread start failed.");
00116 delete paropeners[i].thr;
00117 paropeners[i].thr = 0;
00118 break;
00119 }
00120 }
00121
00122 }
00123
00124 for (i = 0; i < mx; i++)
00125 if (paropeners[i].thr) {
00126 Info(XrdClientDebug::kHIDEBUG,
00127 "XrdClientMStream::EstablishParallelStreams", "Waiting for substream " << i+1 << "." );
00128 paropeners[i].thr->Join(0);
00129 delete paropeners[i].thr;
00130 }
00131
00132
00133
00134
00135
00136
00137 Info(XrdClientDebug::kHIDEBUG,
00138 "XrdClientMStream::EstablishParallelStreams", "Parallel streams establishment finished." );
00139
00140 return i;
00141 }
00142
00143
00144
00145 int XrdClientMStream::AddParallelStream(XrdClientConn *cliconn, int port, int windowsz, int tempid) {
00146
00147 XrdClientPhyConnection *phyconn = XrdClientConn::GetPhyConn(cliconn->GetLogConnID());
00148
00149
00150
00151 if (phyconn->GetSockIdCount() > EnvGetLong(NAME_MULTISTREAMCNT)) return 0;
00152
00153
00154
00155
00156 int sockdescr = phyconn->TryConnectParallelStream(port, windowsz, tempid);
00157
00158 if (sockdescr < 0) return -1;
00159
00160
00161
00162
00163 ServerInitHandShake xbody;
00164 if (phyconn->DoHandShake(xbody, tempid) == kSTError) return -1;
00165
00166
00167 int newid = -1;
00168 int res = -1;
00169 if (BindPendingStream(cliconn, tempid, newid) &&
00170 phyconn->IsValid() ) {
00171
00172
00173 res = phyconn->EstablishPendingParallelStream(tempid, newid);
00174
00175 if (res) {
00176
00177 RemoveParallelStream(cliconn, tempid);
00178 return res;
00179 }
00180
00181
00182 phyconn->UnBanSockDescr(sockdescr);
00183 phyconn->ReinitFDTable();
00184
00185 }
00186 else {
00187
00188 RemoveParallelStream(cliconn, tempid);
00189 return -1;
00190 }
00191
00192 Info(XrdClientDebug::kHIDEBUG,
00193 "XrdClientMStream::EstablishParallelStreams", "Substream added." );
00194 return 0;
00195
00196 }
00197
00198
00199 int XrdClientMStream::RemoveParallelStream(XrdClientConn *cliconn, int substream) {
00200
00201
00202 XrdClientLogConnection *log = ConnectionManager->GetConnection(cliconn->GetLogConnID());
00203 if (!log) return 0;
00204
00205 XrdClientPhyConnection *phyconn = log->GetPhyConnection();
00206
00207 if (phyconn)
00208 phyconn->RemoveParallelStream(substream);
00209
00210 return 0;
00211
00212 }
00213
00214
00215
00216
00217
00218 bool XrdClientMStream::BindPendingStream(XrdClientConn *cliconn, int substreamid, int &newid) {
00219 bool res = false;
00220
00221
00222 ClientRequest bindFileRequest;
00223 XrdClientConn::SessionIDInfo sess;
00224 ServerResponseBody_Bind bndresp;
00225
00226
00227 XrdClientPhyConnection *phyconn =
00228 ConnectionManager->GetConnection(cliconn->GetLogConnID())->GetPhyConnection();
00229
00230 cliconn->GetSessionID(sess);
00231
00232 memset( &bindFileRequest, 0, sizeof(bindFileRequest) );
00233 cliconn->SetSID(bindFileRequest.header.streamid);
00234 bindFileRequest.bind.requestid = kXR_bind;
00235 memcpy( bindFileRequest.bind.sessid, sess.id, sizeof(sess.id) );
00236
00237
00238 clientMarshall(&bindFileRequest);
00239 res = phyconn->WriteRaw(&bindFileRequest, sizeof(bindFileRequest), substreamid);
00240
00241 if (!res) return false;
00242
00243 ServerResponseHeader hdr;
00244 int rdres = 0;
00245
00246
00247 rdres = phyconn->ReadRaw(&hdr, sizeof(ServerResponseHeader), substreamid);
00248
00249 if (rdres < (int)sizeof(ServerResponseHeader)) {
00250 Error("BindPendingStream", "Error reading bind response header for substream " << substreamid << ".");
00251 return false;
00252 }
00253
00254 clientUnmarshall(&hdr);
00255
00256
00257
00258
00259
00260 if (hdr.status != kXR_ok) {
00261 Error("BindPendingStream", "Server denied binding for substream " << substreamid << ".");
00262 return false;
00263 }
00264
00265 if (hdr.dlen != sizeof(bndresp)) {
00266 Error("BindPendingStream", "Unrecognized response datalen binding substream " << substreamid << ".");
00267 return false;
00268 }
00269
00270 rdres = phyconn->ReadRaw(&bndresp, sizeof(bndresp), substreamid);
00271 if (rdres != sizeof(bndresp)) {
00272 Error("BindPendingStream", "Error reading response binding substream " << substreamid << ".");
00273 return false;
00274 }
00275
00276 newid = bndresp.substreamid;
00277
00278 return res;
00279
00280 }
00281
00282
00283 void XrdClientMStream::GetGoodSplitParameters(XrdClientConn *cliconn,
00284 int &spltsize, int &reqsperstream,
00285 kXR_int32 len) {
00286 spltsize = DFLT_MULTISTREAMSPLITSIZE;
00287 reqsperstream = 4;
00288
00289
00290
00291 if (cliconn->GetParallelStreamCount() > 1) {
00292
00293
00294
00295 int candlen = xrdmax(DFLT_MULTISTREAMSPLITSIZE,
00296 len / (reqsperstream * (cliconn->GetParallelStreamCount()-1)) + 1 );
00297
00298
00299
00300 if (candlen < DFLT_MULTISTREAMSPLITSIZE) {
00301 spltsize = xrdmax(DFLT_MULTISTREAMSPLITSIZE,
00302 len / (cliconn->GetParallelStreamCount()-1) + 1 );
00303 reqsperstream = 1;
00304 }
00305 else spltsize = candlen;
00306
00307 }
00308 else spltsize = len;
00309
00310
00311
00312
00313 }
00314
00315
00316
00317
00318
00319 bool XrdClientMStream::SplitReadRequest(XrdClientConn *cliconn, kXR_int64 offset, kXR_int32 len,
00320 XrdClientVector<XrdClientMStream::ReadChunk> &reqlists) {
00321
00322 int spltsize = 0;
00323 int reqsperstream = 0;
00324
00325 GetGoodSplitParameters(cliconn, spltsize, reqsperstream, len);
00326 for (kXR_int32 pp = 0; pp < len; pp += spltsize) {
00327 ReadChunk ck;
00328
00329 ck.offset = pp+offset;
00330 ck.len = xrdmin(len - pp, spltsize);
00331 ck.streamtosend = cliconn->GetParallelStreamToUse(reqsperstream);
00332
00333 reqlists.Push_back(ck);
00334
00335 }
00336
00337 return true;
00338 }