TPSocket.cxx

Go to the documentation of this file.
00001 // @(#)root/net:$Id: TPSocket.cxx 24480 2008-06-23 13:29:34Z rdm $
00002 // Author: Fons Rademakers   22/1/2001
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TPSocket                                                             //
00015 //                                                                      //
00016 // This class implements parallel client sockets. A parallel socket is  //
00017 // an endpoint for communication between two machines. It is parallel   //
00018 // because several TSockets are open at the same time to the same       //
00019 // destination. This especially speeds up communication over Big Fat    //
00020 // Pipes (i.e. high bandwidth, high latency WAN connections).           //
00021 //                                                                      //
00022 //////////////////////////////////////////////////////////////////////////
00023 
00024 #include "TPSocket.h"
00025 #include "TUrl.h"
00026 #include "TServerSocket.h"
00027 #include "TMonitor.h"
00028 #include "TSystem.h"
00029 #include "TMessage.h"
00030 #include "Bytes.h"
00031 #include "TROOT.h"
00032 #include "TError.h"
00033 #include "TVirtualMutex.h"
00034 
00035 ClassImp(TPSocket)
00036 
00037 //______________________________________________________________________________
00038 TPSocket::TPSocket(TInetAddress addr, const char *service, Int_t size,
00039                    Int_t tcpwindowsize) : TSocket(addr, service)
00040 {
00041    // Create a parallel socket. Connect to the named service at address addr.
00042    // Use tcpwindowsize to specify the size of the receive buffer, it has
00043    // to be specified here to make sure the window scale option is set (for
00044    // tcpwindowsize > 65KB and for platforms supporting window scaling).
00045    // Returns when connection has been accepted by remote side. Use IsValid()
00046    // to check the validity of the socket. Every socket is added to the TROOT
00047    // sockets list which will make sure that any open sockets are properly
00048    // closed on program termination.
00049 
00050    fSize = size;
00051    Init(tcpwindowsize);
00052 }
00053 
00054 //______________________________________________________________________________
00055 TPSocket::TPSocket(TInetAddress addr, Int_t port, Int_t size,
00056                    Int_t tcpwindowsize) : TSocket(addr, port)
00057 {
00058    // Create a parallel socket. Connect to the specified port # at address addr.
00059    // Use tcpwindowsize to specify the size of the receive buffer, it has
00060    // to be specified here to make sure the window scale option is set (for
00061    // tcpwindowsize > 65KB and for platforms supporting window scaling).
00062    // Returns when connection has been accepted by remote side. Use IsValid()
00063    // to check the validity of the socket. Every socket is added to the TROOT
00064    // sockets list which will make sure that any open sockets are properly
00065    // closed on program termination.
00066 
00067    fSize = size;
00068    Init(tcpwindowsize);
00069 }
00070 
00071 //______________________________________________________________________________
00072 TPSocket::TPSocket(const char *host, const char *service, Int_t size,
00073                    Int_t tcpwindowsize) : TSocket(host, service)
00074 {
00075    // Create a parallel socket. Connect to named service on the remote host.
00076    // Use tcpwindowsize to specify the size of the receive buffer, it has
00077    // to be specified here to make sure the window scale option is set (for
00078    // tcpwindowsize > 65KB and for platforms supporting window scaling).
00079    // Returns when connection has been accepted by remote side. Use IsValid()
00080    // to check the validity of the socket. Every socket is added to the TROOT
00081    // sockets list which will make sure that any open sockets are properly
00082    // closed on program termination.
00083 
00084    fSize = size;
00085    Init(tcpwindowsize);
00086 }
00087 
00088 //______________________________________________________________________________
00089 TPSocket::TPSocket(const char *host, Int_t port, Int_t size,
00090                    Int_t tcpwindowsize)
00091                   : TSocket(host, port, (Int_t)(size > 1 ? -1 : tcpwindowsize))
00092 {
00093    // Create a parallel socket. Connect to specified port # on the remote host.
00094    // Use tcpwindowsize to specify the size of the receive buffer, it has
00095    // to be specified here to make sure the window scale option is set (for
00096    // tcpwindowsize > 65KB and for platforms supporting window scaling).
00097    // Returns when connection has been accepted by remote side. Use IsValid()
00098    // to check the validity of the socket. Every socket is added to the TROOT
00099    // sockets list which will make sure that any open sockets are properly
00100    // closed on program termination.
00101 
00102    // To avoid uninitialization problems when Init is not called ...
00103    fSockets        = 0;
00104    fWriteMonitor   = 0;
00105    fReadMonitor    = 0;
00106    fWriteBytesLeft = 0;
00107    fReadBytesLeft  = 0;
00108    fWritePtr       = 0;
00109    fReadPtr        = 0;
00110 
00111    // set to the real value only at end (except for old servers)
00112    fSize           = 1;
00113 
00114    // to control the flow
00115    Bool_t valid = TSocket::IsValid();
00116 
00117    // check if we are called from CreateAuthSocket()
00118    Bool_t authreq = kFALSE;
00119    char *pauth = (char *)strstr(host, "?A");
00120    if (pauth) {
00121       authreq = kTRUE;
00122    }
00123 
00124    // perhaps we can use fServType here ... to be checked
00125    Bool_t rootdSrv = (strstr(host,"rootd")) ? kTRUE : kFALSE;
00126 
00127    // try authentication , if required
00128    if (authreq) {
00129       if (valid) {
00130          if (!Authenticate(TUrl(host).GetUser())) {
00131             if (rootdSrv && fRemoteProtocol < 10) {
00132                // We failed because we are talking to an old
00133                // server: we need to re-open the connection
00134                // and communicate the size first
00135                Int_t tcpw = (size > 1 ? -1 : tcpwindowsize);
00136                TSocket *ns = new TSocket(host, port, tcpw);
00137                if (ns->IsValid()) {
00138                   R__LOCKGUARD2(gROOTMutex);
00139                   gROOT->GetListOfSockets()->Remove(ns);
00140                   fSocket = ns->GetDescriptor();
00141                   fSize = size;
00142                   Init(tcpwindowsize);
00143                }
00144                if ((valid = IsValid())) {
00145                   if (!Authenticate(TUrl(host).GetUser())) {
00146                      TSocket::Close();
00147                      valid = kFALSE;
00148                   }
00149                }
00150             } else {
00151                TSocket::Close();
00152                valid = kFALSE;
00153             }
00154          }
00155       }
00156       // reset url to the original state
00157       *pauth = '\0';
00158       SetUrl(host);
00159    }
00160 
00161    // open the sockets ...
00162    if (!rootdSrv || fRemoteProtocol > 9) {
00163       if (valid) {
00164          fSize = size;
00165          Init(tcpwindowsize);
00166       }
00167    }
00168 }
00169 
00170 //______________________________________________________________________________
00171 TPSocket::TPSocket(const char *host, Int_t port, Int_t size, TSocket *sock)
00172 {
00173    // Create a parallel socket on a connection already opened via
00174    // TSocket sock.
00175    // This constructor is provided to optimize TNetFile opening when
00176    // instatiated via a call to TXNetFile.
00177    // Returns when connection has been accepted by remote side. Use IsValid()
00178    // to check the validity of the socket. Every socket is added to the TROOT
00179    // sockets list which will make sure that any open sockets are properly
00180    // closed on program termination.
00181 
00182    // To avoid uninitialization problems when Init is not called ...
00183    fSockets        = 0;
00184    fWriteMonitor   = 0;
00185    fReadMonitor    = 0;
00186    fWriteBytesLeft = 0;
00187    fReadBytesLeft  = 0;
00188    fWritePtr       = 0;
00189    fReadPtr        = 0;
00190 
00191    // set to the real value only at end (except for old servers)
00192    fSize           = 1;
00193 
00194    // We need a opened connection
00195    if (!sock) return;
00196 
00197    // Now import existing socket info
00198    fSocket         = sock->GetDescriptor();
00199    fService        = sock->GetService();
00200    fAddress        = sock->GetInetAddress();
00201    fLocalAddress   = sock->GetLocalInetAddress();
00202    fBytesSent      = sock->GetBytesSent();
00203    fBytesRecv      = sock->GetBytesRecv();
00204    fCompress       = sock->GetCompressionLevel();
00205    fSecContext     = sock->GetSecContext();
00206    fRemoteProtocol = sock->GetRemoteProtocol();
00207    fServType       = (TSocket::EServiceType)sock->GetServType();
00208    fTcpWindowSize  = sock->GetTcpWindowSize();
00209 
00210    // to control the flow
00211    Bool_t valid = sock->IsValid();
00212 
00213    // check if we are called from CreateAuthSocket()
00214    Bool_t authreq = kFALSE;
00215    char *pauth = (char *)strstr(host, "?A");
00216    if (pauth) {
00217       authreq = kTRUE;
00218    }
00219 
00220    // perhaps we can use fServType here ... to be checked
00221    Bool_t rootdSrv = (strstr(host,"rootd")) ? kTRUE : kFALSE;
00222 
00223    // try authentication , if required
00224    if (authreq) {
00225       if (valid) {
00226          if (!Authenticate(TUrl(host).GetUser())) {
00227             if (rootdSrv && fRemoteProtocol < 10) {
00228                // We failed because we are talking to an old
00229                // server: we need to re-open the connection
00230                // and communicate the size first
00231                Int_t tcpw = (size > 1 ? -1 : fTcpWindowSize);
00232                TSocket *ns = new TSocket(host, port, tcpw);
00233                if (ns->IsValid()) {
00234                   R__LOCKGUARD2(gROOTMutex);
00235                   gROOT->GetListOfSockets()->Remove(ns);
00236                   fSocket = ns->GetDescriptor();
00237                   fSize = size;
00238                   Init(fTcpWindowSize);
00239                }
00240                if ((valid = IsValid())) {
00241                   if (!Authenticate(TUrl(host).GetUser())) {
00242                      TSocket::Close();
00243                      valid = kFALSE;
00244                   }
00245                }
00246             } else {
00247                TSocket::Close();
00248                valid = kFALSE;
00249             }
00250          }
00251       }
00252       // reset url to the original state
00253       *pauth = '\0';
00254       SetUrl(host);
00255    }
00256 
00257    // open the sockets ...
00258    if (!rootdSrv || fRemoteProtocol > 9) {
00259       if (valid) {
00260          fSize = size;
00261          Init(fTcpWindowSize, sock);
00262       }
00263    }
00264 
00265    // Add to the list if everything OK
00266    if (IsValid()) {
00267       R__LOCKGUARD2(gROOTMutex);
00268       gROOT->GetListOfSockets()->Add(this);
00269    }
00270 }
00271 
00272 //______________________________________________________________________________
00273 TPSocket::TPSocket(TSocket *pSockets[], Int_t size)
00274 {
00275    // Create a parallel socket. This ctor is called by TPServerSocket.
00276 
00277    fSockets = pSockets;
00278    fSize    = size;
00279 
00280    // set descriptor if simple socket (needed when created
00281    // by TPServerSocket)
00282    if (fSize <= 1)
00283       fSocket = fSockets[0]->GetDescriptor();
00284 
00285    // set socket options (no blocking and no delay)
00286    SetOption(kNoDelay, 1);
00287    if (fSize > 1)
00288       SetOption(kNoBlock, 1);
00289 
00290    fWriteMonitor   = new TMonitor;
00291    fReadMonitor    = new TMonitor;
00292    fWriteBytesLeft = new Int_t[fSize];
00293    fReadBytesLeft  = new Int_t[fSize];
00294    fWritePtr       = new char*[fSize];
00295    fReadPtr        = new char*[fSize];
00296 
00297    for (int i = 0; i < fSize; i++) {
00298       fWriteMonitor->Add(fSockets[i], TMonitor::kWrite);
00299       fReadMonitor->Add(fSockets[i], TMonitor::kRead);
00300    }
00301    fWriteMonitor->DeActivateAll();
00302    fReadMonitor->DeActivateAll();
00303 
00304    SetName(fSockets[0]->GetName());
00305    SetTitle(fSockets[0]->GetTitle());
00306    fAddress = fSockets[0]->GetInetAddress();
00307 
00308    {
00309       R__LOCKGUARD2(gROOTMutex);
00310       gROOT->GetListOfSockets()->Add(this);
00311    }
00312 }
00313 
00314 //______________________________________________________________________________
00315 TPSocket::~TPSocket()
00316 {
00317    // Cleanup the parallel socket.
00318 
00319    Close();
00320 
00321    delete fWriteMonitor;
00322    delete fReadMonitor;
00323    delete [] fWriteBytesLeft;
00324    delete [] fReadBytesLeft;
00325    delete [] fWritePtr;
00326    delete [] fReadPtr;
00327 }
00328 
00329 //______________________________________________________________________________
00330 void TPSocket::Close(Option_t *option)
00331 {
00332    // Close a parallel socket. If option is "force", calls shutdown(id,2) to
00333    // shut down the connection. This will close the connection also
00334    // for the parent of this process. Also called via the dtor (without
00335    // option "force", call explicitely Close("force") if this is desired).
00336 
00337 
00338    if (!IsValid()) {
00339       // if closing happens too early (e.g. timeout) the underlying
00340       // socket may still be open
00341       TSocket::Close(option);
00342       return;
00343    }
00344 
00345    if (fSize <= 1) {
00346       TSocket::Close(option);
00347    } else {
00348       for (int i = 0; i < fSize; i++) {
00349          fSockets[i]->Close(option);
00350          delete fSockets[i];
00351       }
00352    }
00353    delete [] fSockets;
00354    fSockets = 0;
00355 
00356    {
00357       R__LOCKGUARD2(gROOTMutex);
00358       gROOT->GetListOfSockets()->Remove(this);
00359    }
00360 }
00361 
00362 //______________________________________________________________________________
00363 void TPSocket::Init(Int_t tcpwindowsize, TSocket *sock)
00364 {
00365    // Create a parallel socket to the specified host.
00366 
00367    fSockets        = 0;
00368    fWriteMonitor   = 0;
00369    fReadMonitor    = 0;
00370    fWriteBytesLeft = 0;
00371    fReadBytesLeft  = 0;
00372    fWritePtr       = 0;
00373    fReadPtr        = 0;
00374 
00375    if ((sock && !sock->IsValid()) || !TSocket::IsValid())
00376       return;
00377 
00378    Int_t i = 0;
00379 
00380    if (fSize <= 1) {
00381       // check if single mode
00382       fSize = 1;
00383 
00384       // set socket options (no delay)
00385       if (sock)
00386          sock->SetOption(kNoDelay, 1);
00387       else
00388          TSocket::SetOption(kNoDelay, 1);
00389 
00390       // if yes, communicate this to server
00391       // (size = 0 for backward compatibility)
00392       if (sock)
00393          sock->Send((Int_t)0, (Int_t)0);
00394       else
00395          TSocket::Send((Int_t)0, (Int_t)0);
00396 
00397       // needs to fill additional private members
00398       fSockets = new TSocket*[1];
00399       fSockets[0]= (TSocket *)this;
00400 
00401    } else {
00402 
00403       // create server that will be used to accept the parallel sockets from
00404       // the remote host, use port=0 to scan for a free port
00405       TServerSocket ss(0, kFALSE, fSize, tcpwindowsize);
00406 
00407       // send the local port number of the just created server socket and the
00408       // number of desired parallel sockets
00409       if (sock)
00410          sock->Send(ss.GetLocalPort(), fSize);
00411       else
00412          TSocket::Send(ss.GetLocalPort(), fSize);
00413 
00414       fSockets = new TSocket*[fSize];
00415 
00416       // establish fSize parallel socket connections between client and server
00417       for (i = 0; i < fSize; i++) {
00418          fSockets[i] = ss.Accept();
00419          R__LOCKGUARD2(gROOTMutex);
00420          gROOT->GetListOfSockets()->Remove(fSockets[i]);
00421       }
00422 
00423       // set socket options (no blocking and no delay)
00424       SetOption(kNoDelay, 1);
00425       SetOption(kNoBlock, 1);
00426 
00427       // close original socket
00428       if (sock)
00429          sock->Close();
00430       else
00431          gSystem->CloseConnection(fSocket, kFALSE);
00432       fSocket = -1;
00433    }
00434 
00435    fWriteMonitor   = new TMonitor;
00436    fReadMonitor    = new TMonitor;
00437    fWriteBytesLeft = new Int_t[fSize];
00438    fReadBytesLeft  = new Int_t[fSize];
00439    fWritePtr       = new char*[fSize];
00440    fReadPtr        = new char*[fSize];
00441 
00442    for (i = 0; i < fSize; i++) {
00443       fWriteMonitor->Add(fSockets[i], TMonitor::kWrite);
00444       fReadMonitor->Add(fSockets[i], TMonitor::kRead);
00445    }
00446    fWriteMonitor->DeActivateAll();
00447    fReadMonitor->DeActivateAll();
00448 }
00449 
00450 //______________________________________________________________________________
00451 TInetAddress TPSocket::GetLocalInetAddress()
00452 {
00453    // Return internet address of local host to which the socket is bound.
00454    // In case of error TInetAddress::IsValid() returns kFALSE.
00455 
00456    if (fSize<= 1)
00457       return TSocket::GetLocalInetAddress();
00458 
00459    if (IsValid()) {
00460       if (fLocalAddress.GetPort() == -1)
00461          fLocalAddress = gSystem->GetSockName(fSockets[0]->GetDescriptor());
00462       return fLocalAddress;
00463    }
00464    return TInetAddress();
00465 }
00466 
00467 //______________________________________________________________________________
00468 Int_t TPSocket::GetDescriptor() const
00469 {
00470    // Return socket descriptor
00471 
00472    if (fSize <= 1)
00473       return TSocket::GetDescriptor();
00474 
00475    return fSockets ? fSockets[0]->GetDescriptor() : -1;
00476 
00477 }
00478 
00479 //______________________________________________________________________________
00480 Int_t TPSocket::Send(const TMessage &mess)
00481 {
00482    // Send a TMessage object. Returns the number of bytes in the TMessage
00483    // that were sent and -1 in case of error. In case the TMessage::What
00484    // has been or'ed with kMESS_ACK, the call will only return after having
00485    // received an acknowledgement, making the sending process synchronous.
00486    // Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
00487 
00488    if (!fSockets || fSize <= 1)
00489       return TSocket::Send(mess);  // only the case when called via Init()
00490 
00491    if (!IsValid()) {
00492       return -1;
00493    }
00494 
00495    if (mess.IsReading()) {
00496       Error("Send", "cannot send a message used for reading");
00497       return -1;
00498    }
00499 
00500    // send streamer infos in case schema evolution is enabled in the TMessage
00501    SendStreamerInfos(mess);
00502 
00503    // send the process id's so TRefs work
00504    SendProcessIDs(mess);
00505 
00506    mess.SetLength();   //write length in first word of buffer
00507 
00508    if (fCompress > 0 && mess.GetCompressionLevel() == 0)
00509       const_cast<TMessage&>(mess).SetCompressionLevel(fCompress);
00510 
00511    if (mess.GetCompressionLevel() > 0)
00512       const_cast<TMessage&>(mess).Compress();
00513 
00514    char *mbuf = mess.Buffer();
00515    Int_t mlen = mess.Length();
00516    if (mess.CompBuffer()) {
00517       mbuf = mess.CompBuffer();
00518       mlen = mess.CompLength();
00519    }
00520 
00521    Int_t nsent, ulen = (Int_t) sizeof(UInt_t);
00522    // send length
00523    if ((nsent = SendRaw(mbuf, ulen, kDefault)) <= 0)
00524       return nsent;
00525 
00526    // send buffer (this might go in parallel)
00527    if ((nsent = SendRaw(mbuf+ulen, mlen-ulen, kDefault)) <= 0)
00528       return nsent;
00529 
00530    // if acknowledgement is desired, wait for it
00531    if (mess.What() & kMESS_ACK) {
00532       char buf[2];
00533       if (RecvRaw(buf, sizeof(buf), kDefault) < 0)
00534          return -1;
00535       if (strncmp(buf, "ok", 2)) {
00536          Error("Send", "bad acknowledgement");
00537          return -1;
00538       }
00539    }
00540 
00541    return nsent;  //length - length header
00542 }
00543 
00544 //______________________________________________________________________________
00545 Int_t TPSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
00546 {
00547    // Send a raw buffer of specified length. Returns the number of bytes
00548    // send and -1 in case of error.
00549 
00550    if (fSize == 1)
00551       return TSocket::SendRaw(buffer,length,opt);
00552 
00553    if (!fSockets) return -1;
00554 
00555    // if data buffer size < 4K use only one socket
00556    Int_t i, nsocks = fSize, len = length;
00557    if (len < 4096)
00558       nsocks = 1;
00559 
00560    ESendRecvOptions sendopt = kDontBlock;
00561    if (nsocks == 1)
00562       sendopt = kDefault;
00563 
00564    if (opt != kDefault) {
00565       nsocks  = 1;
00566       sendopt = opt;
00567    }
00568 
00569    if (nsocks == 1)
00570       fSockets[0]->SetOption(kNoBlock, 0);
00571    else
00572       fSockets[0]->SetOption(kNoBlock, 1);
00573 
00574    // setup pointer appropriately for transferring data equally on the
00575    // parallel sockets
00576    for (i = 0; i < nsocks; i++) {
00577       fWriteBytesLeft[i] = len/nsocks;
00578       fWritePtr[i] = (char *)buffer + (i*fWriteBytesLeft[i]);
00579       fWriteMonitor->Activate(fSockets[i]);
00580    }
00581    fWriteBytesLeft[nsocks-1] += len%nsocks;
00582 
00583    // send the data on the parallel sockets
00584    while (len > 0) {
00585       TSocket *s = fWriteMonitor->Select();
00586       for (int is = 0; is < nsocks; is++) {
00587          if (s == fSockets[is]) {
00588             if (fWriteBytesLeft[is] > 0) {
00589                Int_t nsent;
00590 again:
00591                if ((nsent = fSockets[is]->SendRaw(fWritePtr[is],
00592                                                   fWriteBytesLeft[is],
00593                                                   sendopt)) <= 0) {
00594                   if (nsent == -4) {
00595                      // got EAGAIN/EWOULDBLOCK error, keep trying...
00596                      goto again;
00597                   }
00598                   fWriteMonitor->DeActivateAll();
00599                   if (nsent == -5) {
00600                      // connection reset by peer or broken ...
00601                      Close();
00602                   }
00603                   return -1;
00604                }
00605                if (opt == kDontBlock) {
00606                   fWriteMonitor->DeActivateAll();
00607                   return nsent;
00608                }
00609                fWriteBytesLeft[is] -= nsent;
00610                fWritePtr[is] += nsent;
00611                len -= nsent;
00612             }
00613          }
00614       }
00615    }
00616    fWriteMonitor->DeActivateAll();
00617 
00618    return length;
00619 }
00620 
00621 //______________________________________________________________________________
00622 Int_t TPSocket::Recv(TMessage *&mess)
00623 {
00624    // Receive a TMessage object. The user must delete the TMessage object.
00625    // Returns length of message in bytes (can be 0 if other side of connection
00626    // is closed) or -1 in case of error or -4 in case a non-blocking socket would
00627    // block (i.e. there is nothing to be read). In those case mess == 0.
00628 
00629    if (fSize <= 1)
00630       return TSocket::Recv(mess);
00631 
00632    if (!IsValid()) {
00633       mess = 0;
00634       return -1;
00635    }
00636 
00637 oncemore:
00638    Int_t  n;
00639    UInt_t len;
00640    if ((n = RecvRaw(&len, sizeof(UInt_t), kDefault)) <= 0) {
00641       mess = 0;
00642       return n;
00643    }
00644    len = net2host(len);  //from network to host byte order
00645 
00646    char *buf = new char[len+sizeof(UInt_t)];
00647    if ((n = RecvRaw(buf+sizeof(UInt_t), len, kDefault)) <= 0) {
00648       delete [] buf;
00649       mess = 0;
00650       return n;
00651    }
00652 
00653    mess = new TMessage(buf, len+sizeof(UInt_t));
00654 
00655    // receive any streamer infos
00656    if (RecvStreamerInfos(mess))
00657       goto oncemore;
00658 
00659    // receive any process ids
00660    if (RecvProcessIDs(mess))
00661       goto oncemore;
00662 
00663    if (mess->What() & kMESS_ACK) {
00664       char ok[2] = { 'o', 'k' };
00665       if (SendRaw(ok, sizeof(ok), kDefault) < 0) {
00666          delete mess;
00667          mess = 0;
00668          return -1;
00669       }
00670       mess->SetWhat(mess->What() & ~kMESS_ACK);
00671    }
00672 
00673    return n;
00674 }
00675 
00676 //______________________________________________________________________________
00677 Int_t TPSocket::RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt)
00678 {
00679    // Send a raw buffer of specified length. Returns the number of bytes
00680    // sent or -1 in case of error.
00681 
00682    if (fSize <= 1)
00683       return TSocket::RecvRaw(buffer,length,opt);
00684 
00685    if (!fSockets) return -1;
00686 
00687    // if data buffer size < 4K use only one socket
00688    Int_t i, nsocks = fSize, len = length;
00689    if (len < 4096)
00690       nsocks = 1;
00691 
00692    ESendRecvOptions recvopt = kDontBlock;
00693    if (nsocks == 1)
00694       recvopt = kDefault;
00695 
00696    if (opt != kDefault) {
00697       nsocks  = 1;
00698       recvopt = opt;
00699    }
00700 
00701    if (nsocks == 1)
00702       fSockets[0]->SetOption(kNoBlock, 0);
00703    else
00704       fSockets[0]->SetOption(kNoBlock, 1);
00705 
00706    // setup pointer appropriately for transferring data equally on the
00707    // parallel sockets
00708    for (i = 0; i < nsocks; i++) {
00709       fReadBytesLeft[i] = len/nsocks;
00710       fReadPtr[i] = (char *)buffer + (i*fReadBytesLeft[i]);
00711       fReadMonitor->Activate(fSockets[i]);
00712    }
00713    fReadBytesLeft[nsocks-1] += len%nsocks;
00714 
00715    // start receiving data on all sockets. Receive data as and when
00716    // they are available on a socket by by using select.
00717    // Exit the loop as soon as all data has been received.
00718    while (len > 0) {
00719       TSocket *s = fReadMonitor->Select();
00720       for (int is = 0; is < nsocks; is++) {
00721          if (s == fSockets[is]) {
00722             if (fReadBytesLeft[is] > 0) {
00723                Int_t nrecv;
00724                if ((nrecv = fSockets[is]->RecvRaw(fReadPtr[is],
00725                                                   fReadBytesLeft[is],
00726                                                   recvopt)) <= 0) {
00727                   fReadMonitor->DeActivateAll();
00728                   if (nrecv == -5) {
00729                      // connection reset by peer or broken ...
00730                      Close();
00731                   }
00732                   return -1;
00733                }
00734                if (opt == kDontBlock) {
00735                   fReadMonitor->DeActivateAll();
00736                   return nrecv;
00737                }
00738                fReadBytesLeft[is] -= nrecv;
00739                fReadPtr[is] += nrecv;
00740                len -= nrecv;
00741             }
00742          }
00743       }
00744    }
00745    fReadMonitor->DeActivateAll();
00746 
00747    return length;
00748 }
00749 
00750 //______________________________________________________________________________
00751 Int_t TPSocket::SetOption(ESockOptions opt, Int_t val)
00752 {
00753    // Set socket options.
00754 
00755    if (fSize <= 1)
00756       return TSocket::SetOption(opt,val);
00757 
00758    Int_t ret = 0;
00759    for (int i = 0; i < fSize; i++)
00760       ret = fSockets[i]->SetOption(opt, val);
00761    return ret;
00762 }
00763 
00764 //______________________________________________________________________________
00765 Int_t TPSocket::GetOption(ESockOptions opt, Int_t &val)
00766 {
00767    // Get socket options. Returns -1 in case of error.
00768 
00769    if (fSize <= 1)
00770       return TSocket::GetOption(opt,val);
00771 
00772    Int_t ret = 0;
00773    for (int i = 0; i < fSize; i++)
00774       ret = fSockets[i]->GetOption(opt, val);
00775    return ret;
00776 }
00777 
00778 //______________________________________________________________________________
00779 Int_t TPSocket::GetErrorCode() const
00780 {
00781    // Returns error code. Meaning depends on context where it is called.
00782    // If no error condition returns 0 else a value < 0.
00783 
00784    if (fSize <= 1)
00785       return TSocket::GetErrorCode();
00786 
00787    return fSockets[0] ? fSockets[0]->GetErrorCode() : 0;
00788 }

Generated on Tue Jul 5 14:46:11 2011 for ROOT_528-00b_version by  doxygen 1.5.1