XrdClientMStream.cc

Go to the documentation of this file.
00001 //////////////////////////////////////////////////////////////////////////
00002 //                                                                      //
00003 // XrdClientMStream                                                     //
00004 //                                                                      //
00005 // Author: Fabrizio Furano (INFN Padova, 2006)                          //
00006 //                                                                      //
00007 // Helper code for XrdClient to handle multistream behavior             //
00008 // Functionalities dealing with                                         //
00009 //  mstream creation on init                                            //
00010 //  decisions to add/remove one                                         //
00011 //                                                                      //
00012 //////////////////////////////////////////////////////////////////////////
00013 
00014 //         $Id: XrdClientMStream.cc 35060 2010-08-29 08:39:10Z brun $
00015 
00016 const char *XrdClientMStreamCVSID = "$Id: XrdClientMStream.cc 35060 2010-08-29 08:39:10Z brun $";
00017 
00018 
00019 #include "XrdClient/XrdClientMStream.hh"
00020 #include "XrdClient/XrdClientLogConnection.hh"
00021 #include "XrdClient/XrdClientEnv.hh"
00022 #include "XrdClient/XrdClientDebug.hh"
00023 
00024 // This has to be a socket id pool which the server will never assign by itself
00025 // Moreover, socketids are local to an instance of XrdClientPSock
00026 #define XRDCLI_PSOCKTEMP -1000
00027 
00028 struct ParStreamOpenerArgs {
00029    XrdClientThread *thr;
00030    XrdClientConn *cliconn;
00031    int wan_port, wan_window;
00032    int tmpid;
00033 };
00034 
00035 //_____________________________________________________________________________
00036 void *ParStreamOpenerThread(void *arg, XrdClientThread *thr)
00037 {
00038    // This one just opens a new stream
00039 
00040    // Mask all allowed signals
00041    if (thr->MaskSignal(0) != 0)
00042       Error("ParStreamOpenerThread", "Warning: problems masking signals");
00043 
00044    ParStreamOpenerArgs *parms = (ParStreamOpenerArgs *)arg;
00045 
00046    XrdClientMStream::AddParallelStream(parms->cliconn, parms->wan_port, parms->wan_window, parms->tmpid);
00047 
00048    return 0;
00049 }
00050 
00051 
00052 int XrdClientMStream::EstablishParallelStreams(XrdClientConn *cliconn) {
00053     int mx = EnvGetLong(NAME_MULTISTREAMCNT);
00054     int i, res;
00055     int wan_port = 0, wan_window = 0;
00056 
00057     if (mx <= 1) return 1;
00058     if (cliconn->GetServerType() == kSTBaseXrootd) return 1;
00059 
00060     // Get the XrdClientPhyconn to be used
00061     XrdClientPhyConnection *phyconn = XrdClientConn::GetPhyConn(cliconn->GetLogConnID());
00062     if (!phyconn) return 0;
00063     
00064     // For a given phyconn we allow only one single attempt to establish multiple streams
00065     // Any other thread or subsequent attempt will exit
00066     if (phyconn->TestAndSetMStreamsGoing()) return 1;
00067 
00068     // Query the server config, for the WAN port and the windowsize
00069     char *qryitems = (char *)"wan_port wan_window";
00070     ClientRequest qryRequest;
00071     char qryResp[1024];
00072     memset( &qryRequest, 0, sizeof(qryRequest) );
00073     memset( qryResp, 0, 1024 );
00074 
00075     cliconn->SetSID(qryRequest.header.streamid);
00076     qryRequest.header.requestid = kXR_query;
00077     qryRequest.query.infotype = kXR_Qconfig;
00078     qryRequest.header.dlen = strlen(qryitems);
00079 
00080     res =  cliconn->SendGenCommand(&qryRequest, qryitems, 0, qryResp,
00081                                    false, (char *)"QueryConfig");
00082 
00083     if (res && (cliconn->LastServerResp.status == kXR_ok) &&
00084         cliconn->LastServerResp.dlen) {
00085 
00086       sscanf(qryResp, "%d\n%d",
00087              &wan_port,
00088              &wan_window);
00089 
00090       Info(XrdClientDebug::kUSERDEBUG,
00091            "XrdClientMStream::EstablishParallelStreams", "Server WAN parameters: port=" << wan_port << " windowsize=" << wan_window );
00092     }
00093 
00094     // Start the whole bunch of asynchronous connection requests
00095     // By starting one thread for each, calling AddParallelStream once
00096     // If no more threads are available, wait and retry
00097 
00098     ParStreamOpenerArgs paropeners[16];
00099     for (i = 0; i < mx; i++) {
00100        paropeners[i].thr = 0;
00101        paropeners[i].cliconn = cliconn;
00102        paropeners[i].wan_port = wan_port;
00103        paropeners[i].wan_window = wan_window;
00104        paropeners[i].tmpid = 0;
00105     }
00106 
00107     for (i = 0; i < mx; i++) {
00108         Info(XrdClientDebug::kHIDEBUG,
00109              "XrdClientMStream::EstablishParallelStreams", "Trying to establish " << i+1 << "th substream." );
00110 
00111         paropeners[i].thr = new XrdClientThread(ParStreamOpenerThread);
00112         if (paropeners[i].thr) {
00113            paropeners[i].tmpid = XRDCLI_PSOCKTEMP - i;
00114            if (paropeners[i].thr->Run(&paropeners[i])) {
00115               Error("XrdClientMStream::EstablishParallelStreams", "Error establishing " << i+1 << "th substream. Thread start failed.");
00116               delete paropeners[i].thr;
00117               paropeners[i].thr = 0;
00118               break; 
00119            }
00120         }
00121 
00122     }
00123 
00124     for (i = 0; i < mx; i++)
00125        if (paropeners[i].thr) {
00126           Info(XrdClientDebug::kHIDEBUG,
00127              "XrdClientMStream::EstablishParallelStreams", "Waiting for substream " << i+1 << "." );
00128           paropeners[i].thr->Join(0);
00129           delete paropeners[i].thr;
00130        }
00131 
00132         // If something goes wrong, stop adding new streams
00133         //if (AddParallelStream(cliconn, wan_port, wan_window, XRDCLI_PSOCKTEMP - i))
00134         //    break;
00135 
00136 
00137     Info(XrdClientDebug::kHIDEBUG,
00138          "XrdClientMStream::EstablishParallelStreams", "Parallel streams establishment finished." );
00139 
00140     return i;
00141 }
00142 
00143 // Add a parallel stream to the pool used by the given client inst
00144 // Returns 0 if ok
00145 int XrdClientMStream::AddParallelStream(XrdClientConn *cliconn, int port, int windowsz, int tempid) {
00146     // Get the XrdClientPhyconn to be used
00147     XrdClientPhyConnection *phyconn = XrdClientConn::GetPhyConn(cliconn->GetLogConnID());
00148 
00149 
00150     // If the phyconn already has all the needed streams... exit
00151     if (phyconn->GetSockIdCount() > EnvGetLong(NAME_MULTISTREAMCNT)) return 0;
00152 
00153     // Connect a new connection, set the temp socket id and get the descriptor
00154     // Temporary means that we need one to communicate, but its final id
00155     // will be given by the server
00156     int sockdescr = phyconn->TryConnectParallelStream(port, windowsz, tempid);
00157 
00158     if (sockdescr < 0) return -1;
00159 
00160     // The connection now is here but has not yet to be considered by the reader threads
00161     // before having handshaked it, and this has to be sync man
00162     // Do the handshake
00163     ServerInitHandShake xbody;
00164     if (phyconn->DoHandShake(xbody, tempid) == kSTError) return -1;
00165 
00166     // Send the kxr_bind req to get a new substream id, going to be the final one
00167     int newid = -1;
00168     int res = -1;
00169     if (BindPendingStream(cliconn, tempid, newid) &&
00170         phyconn->IsValid() ) {
00171       
00172         // Everything ok, Establish the new connection with the new id
00173         res = phyconn->EstablishPendingParallelStream(tempid, newid);
00174     
00175         if (res) {
00176             // If the establish failed we have to remove the pending stream
00177             RemoveParallelStream(cliconn, tempid);
00178             return res;
00179         }
00180 
00181         // After everything make the reader thread aware of the new stream
00182         phyconn->UnBanSockDescr(sockdescr);
00183         phyconn->ReinitFDTable();
00184     
00185     }
00186     else {
00187         // If the bind failed we have to remove the pending stream
00188         RemoveParallelStream(cliconn, tempid);
00189         return -1;
00190     }
00191 
00192     Info(XrdClientDebug::kHIDEBUG,
00193          "XrdClientMStream::EstablishParallelStreams", "Substream added." );
00194     return 0;
00195 
00196 }
00197 
00198 // Remove a parallel stream to the pool used by the given client inst
00199 int XrdClientMStream::RemoveParallelStream(XrdClientConn *cliconn, int substream) {
00200 
00201   // Get the XrdClientPhyconn to be used
00202   XrdClientLogConnection *log = ConnectionManager->GetConnection(cliconn->GetLogConnID());
00203   if (!log) return 0;
00204 
00205   XrdClientPhyConnection *phyconn = log->GetPhyConnection();
00206     
00207   if (phyconn) 
00208     phyconn->RemoveParallelStream(substream);
00209     
00210   return 0;
00211     
00212 }
00213 
00214 
00215 
00216 // Binds the pending temporary parallel stream to the current session
00217 // Returns the substreamid assigned by the server into newid
00218 bool XrdClientMStream::BindPendingStream(XrdClientConn *cliconn, int substreamid, int &newid) {
00219     bool res = false;
00220 
00221     // Prepare request
00222     ClientRequest bindFileRequest;
00223     XrdClientConn::SessionIDInfo sess;
00224     ServerResponseBody_Bind bndresp;
00225 
00226     // Get the XrdClientPhyconn to be used
00227     XrdClientPhyConnection *phyconn =
00228         ConnectionManager->GetConnection(cliconn->GetLogConnID())->GetPhyConnection();
00229 
00230     cliconn->GetSessionID(sess);
00231 
00232     memset( &bindFileRequest, 0, sizeof(bindFileRequest) );
00233     cliconn->SetSID(bindFileRequest.header.streamid);
00234     bindFileRequest.bind.requestid = kXR_bind;
00235     memcpy( bindFileRequest.bind.sessid, sess.id, sizeof(sess.id) );
00236 
00237     // The request has to be sent through the stream which has to be bound!
00238     clientMarshall(&bindFileRequest);
00239     res = phyconn->WriteRaw(&bindFileRequest, sizeof(bindFileRequest), substreamid);
00240 
00241     if (!res) return false;
00242 
00243     ServerResponseHeader hdr;
00244     int rdres = 0;
00245 
00246     // Now wait for the header, on the same substream
00247     rdres = phyconn->ReadRaw(&hdr, sizeof(ServerResponseHeader), substreamid);
00248 
00249     if (rdres < (int)sizeof(ServerResponseHeader)) {
00250        Error("BindPendingStream", "Error reading bind response header for substream " << substreamid << ".");
00251        return false;
00252     }
00253 
00254     clientUnmarshall(&hdr);
00255 
00256     // Now wait for the response data, if any
00257     // This code is specialized.
00258     // If the answer is not what we were expecting, just return false,
00259     //  expecting that this connection will be shut down
00260     if (hdr.status != kXR_ok) {
00261        Error("BindPendingStream", "Server denied binding for substream " << substreamid << ".");
00262        return false;
00263     }
00264 
00265     if (hdr.dlen != sizeof(bndresp)) {
00266        Error("BindPendingStream", "Unrecognized response datalen binding substream " << substreamid << ".");
00267        return false;
00268     }
00269 
00270     rdres = phyconn->ReadRaw(&bndresp, sizeof(bndresp), substreamid);
00271     if (rdres != sizeof(bndresp)) {
00272        Error("BindPendingStream", "Error reading response binding substream " << substreamid << ".");
00273        return false;
00274     }
00275 
00276     newid = bndresp.substreamid;
00277 
00278     return res;
00279 
00280 }
00281 
00282 
00283 void XrdClientMStream::GetGoodSplitParameters(XrdClientConn *cliconn,
00284                                               int &spltsize, int &reqsperstream,
00285                                               kXR_int32 len) {
00286   spltsize = DFLT_MULTISTREAMSPLITSIZE;
00287   reqsperstream = 4;
00288 
00289 
00290   // Let's try to distribute the load into maximum sized chunks
00291   if (cliconn->GetParallelStreamCount() > 1) {
00292     
00293     // We start seeing which length we get trying to fill all the
00294     // available slots ( per stream)
00295     int candlen = xrdmax(DFLT_MULTISTREAMSPLITSIZE,
00296                          len / (reqsperstream * (cliconn->GetParallelStreamCount()-1)) + 1 );
00297     
00298     // We don't want blocks smaller than a min value
00299     // If this is the case we consider only one slot per stream
00300     if (candlen < DFLT_MULTISTREAMSPLITSIZE) {
00301       spltsize = xrdmax(DFLT_MULTISTREAMSPLITSIZE,
00302                         len / (cliconn->GetParallelStreamCount()-1) + 1 );
00303       reqsperstream = 1;
00304     }
00305     else spltsize = candlen;
00306     
00307   }
00308   else spltsize = len;
00309 
00310   //cout << "parstreams: " << cliconn->GetParallelStreamCount() <<
00311   // " len: " << len << " splitsize: " << spltsize << " reqsperstream: " <<
00312   // reqsperstream << endl << endl;
00313 }
00314 
00315 
00316 // This splits a long requests into many smaller requests, to be sent in parallel
00317 //  through multiple streams
00318 // Returns false if the chunk is not worth splitting
00319 bool XrdClientMStream::SplitReadRequest(XrdClientConn *cliconn, kXR_int64 offset, kXR_int32 len,
00320                              XrdClientVector<XrdClientMStream::ReadChunk> &reqlists) {
00321 
00322     int spltsize = 0;
00323     int reqsperstream = 0;
00324 
00325     GetGoodSplitParameters(cliconn, spltsize, reqsperstream, len);
00326     for (kXR_int32 pp = 0; pp < len; pp += spltsize) {
00327       ReadChunk ck;
00328 
00329       ck.offset = pp+offset;
00330       ck.len = xrdmin(len - pp, spltsize);
00331       ck.streamtosend = cliconn->GetParallelStreamToUse(reqsperstream);
00332 
00333       reqlists.Push_back(ck);
00334 
00335     }
00336 
00337     return true;
00338 }

Generated on Tue Jul 5 14:46:18 2011 for ROOT_528-00b_version by  doxygen 1.5.1