00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "TPSocket.h"
00025 #include "TUrl.h"
00026 #include "TServerSocket.h"
00027 #include "TMonitor.h"
00028 #include "TSystem.h"
00029 #include "TMessage.h"
00030 #include "Bytes.h"
00031 #include "TROOT.h"
00032 #include "TError.h"
00033 #include "TVirtualMutex.h"
00034
00035 ClassImp(TPSocket)
00036
00037
00038 TPSocket::TPSocket(TInetAddress addr, const char *service, Int_t size,
00039 Int_t tcpwindowsize) : TSocket(addr, service)
00040 {
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050 fSize = size;
00051 Init(tcpwindowsize);
00052 }
00053
00054
00055 TPSocket::TPSocket(TInetAddress addr, Int_t port, Int_t size,
00056 Int_t tcpwindowsize) : TSocket(addr, port)
00057 {
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067 fSize = size;
00068 Init(tcpwindowsize);
00069 }
00070
00071
00072 TPSocket::TPSocket(const char *host, const char *service, Int_t size,
00073 Int_t tcpwindowsize) : TSocket(host, service)
00074 {
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084 fSize = size;
00085 Init(tcpwindowsize);
00086 }
00087
00088
00089 TPSocket::TPSocket(const char *host, Int_t port, Int_t size,
00090 Int_t tcpwindowsize)
00091 : TSocket(host, port, (Int_t)(size > 1 ? -1 : tcpwindowsize))
00092 {
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103 fSockets = 0;
00104 fWriteMonitor = 0;
00105 fReadMonitor = 0;
00106 fWriteBytesLeft = 0;
00107 fReadBytesLeft = 0;
00108 fWritePtr = 0;
00109 fReadPtr = 0;
00110
00111
00112 fSize = 1;
00113
00114
00115 Bool_t valid = TSocket::IsValid();
00116
00117
00118 Bool_t authreq = kFALSE;
00119 char *pauth = (char *)strstr(host, "?A");
00120 if (pauth) {
00121 authreq = kTRUE;
00122 }
00123
00124
00125 Bool_t rootdSrv = (strstr(host,"rootd")) ? kTRUE : kFALSE;
00126
00127
00128 if (authreq) {
00129 if (valid) {
00130 if (!Authenticate(TUrl(host).GetUser())) {
00131 if (rootdSrv && fRemoteProtocol < 10) {
00132
00133
00134
00135 Int_t tcpw = (size > 1 ? -1 : tcpwindowsize);
00136 TSocket *ns = new TSocket(host, port, tcpw);
00137 if (ns->IsValid()) {
00138 R__LOCKGUARD2(gROOTMutex);
00139 gROOT->GetListOfSockets()->Remove(ns);
00140 fSocket = ns->GetDescriptor();
00141 fSize = size;
00142 Init(tcpwindowsize);
00143 }
00144 if ((valid = IsValid())) {
00145 if (!Authenticate(TUrl(host).GetUser())) {
00146 TSocket::Close();
00147 valid = kFALSE;
00148 }
00149 }
00150 } else {
00151 TSocket::Close();
00152 valid = kFALSE;
00153 }
00154 }
00155 }
00156
00157 *pauth = '\0';
00158 SetUrl(host);
00159 }
00160
00161
00162 if (!rootdSrv || fRemoteProtocol > 9) {
00163 if (valid) {
00164 fSize = size;
00165 Init(tcpwindowsize);
00166 }
00167 }
00168 }
00169
00170
00171 TPSocket::TPSocket(const char *host, Int_t port, Int_t size, TSocket *sock)
00172 {
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183 fSockets = 0;
00184 fWriteMonitor = 0;
00185 fReadMonitor = 0;
00186 fWriteBytesLeft = 0;
00187 fReadBytesLeft = 0;
00188 fWritePtr = 0;
00189 fReadPtr = 0;
00190
00191
00192 fSize = 1;
00193
00194
00195 if (!sock) return;
00196
00197
00198 fSocket = sock->GetDescriptor();
00199 fService = sock->GetService();
00200 fAddress = sock->GetInetAddress();
00201 fLocalAddress = sock->GetLocalInetAddress();
00202 fBytesSent = sock->GetBytesSent();
00203 fBytesRecv = sock->GetBytesRecv();
00204 fCompress = sock->GetCompressionLevel();
00205 fSecContext = sock->GetSecContext();
00206 fRemoteProtocol = sock->GetRemoteProtocol();
00207 fServType = (TSocket::EServiceType)sock->GetServType();
00208 fTcpWindowSize = sock->GetTcpWindowSize();
00209
00210
00211 Bool_t valid = sock->IsValid();
00212
00213
00214 Bool_t authreq = kFALSE;
00215 char *pauth = (char *)strstr(host, "?A");
00216 if (pauth) {
00217 authreq = kTRUE;
00218 }
00219
00220
00221 Bool_t rootdSrv = (strstr(host,"rootd")) ? kTRUE : kFALSE;
00222
00223
00224 if (authreq) {
00225 if (valid) {
00226 if (!Authenticate(TUrl(host).GetUser())) {
00227 if (rootdSrv && fRemoteProtocol < 10) {
00228
00229
00230
00231 Int_t tcpw = (size > 1 ? -1 : fTcpWindowSize);
00232 TSocket *ns = new TSocket(host, port, tcpw);
00233 if (ns->IsValid()) {
00234 R__LOCKGUARD2(gROOTMutex);
00235 gROOT->GetListOfSockets()->Remove(ns);
00236 fSocket = ns->GetDescriptor();
00237 fSize = size;
00238 Init(fTcpWindowSize);
00239 }
00240 if ((valid = IsValid())) {
00241 if (!Authenticate(TUrl(host).GetUser())) {
00242 TSocket::Close();
00243 valid = kFALSE;
00244 }
00245 }
00246 } else {
00247 TSocket::Close();
00248 valid = kFALSE;
00249 }
00250 }
00251 }
00252
00253 *pauth = '\0';
00254 SetUrl(host);
00255 }
00256
00257
00258 if (!rootdSrv || fRemoteProtocol > 9) {
00259 if (valid) {
00260 fSize = size;
00261 Init(fTcpWindowSize, sock);
00262 }
00263 }
00264
00265
00266 if (IsValid()) {
00267 R__LOCKGUARD2(gROOTMutex);
00268 gROOT->GetListOfSockets()->Add(this);
00269 }
00270 }
00271
00272
00273 TPSocket::TPSocket(TSocket *pSockets[], Int_t size)
00274 {
00275
00276
00277 fSockets = pSockets;
00278 fSize = size;
00279
00280
00281
00282 if (fSize <= 1)
00283 fSocket = fSockets[0]->GetDescriptor();
00284
00285
00286 SetOption(kNoDelay, 1);
00287 if (fSize > 1)
00288 SetOption(kNoBlock, 1);
00289
00290 fWriteMonitor = new TMonitor;
00291 fReadMonitor = new TMonitor;
00292 fWriteBytesLeft = new Int_t[fSize];
00293 fReadBytesLeft = new Int_t[fSize];
00294 fWritePtr = new char*[fSize];
00295 fReadPtr = new char*[fSize];
00296
00297 for (int i = 0; i < fSize; i++) {
00298 fWriteMonitor->Add(fSockets[i], TMonitor::kWrite);
00299 fReadMonitor->Add(fSockets[i], TMonitor::kRead);
00300 }
00301 fWriteMonitor->DeActivateAll();
00302 fReadMonitor->DeActivateAll();
00303
00304 SetName(fSockets[0]->GetName());
00305 SetTitle(fSockets[0]->GetTitle());
00306 fAddress = fSockets[0]->GetInetAddress();
00307
00308 {
00309 R__LOCKGUARD2(gROOTMutex);
00310 gROOT->GetListOfSockets()->Add(this);
00311 }
00312 }
00313
00314
00315 TPSocket::~TPSocket()
00316 {
00317
00318
00319 Close();
00320
00321 delete fWriteMonitor;
00322 delete fReadMonitor;
00323 delete [] fWriteBytesLeft;
00324 delete [] fReadBytesLeft;
00325 delete [] fWritePtr;
00326 delete [] fReadPtr;
00327 }
00328
00329
00330 void TPSocket::Close(Option_t *option)
00331 {
00332
00333
00334
00335
00336
00337
00338 if (!IsValid()) {
00339
00340
00341 TSocket::Close(option);
00342 return;
00343 }
00344
00345 if (fSize <= 1) {
00346 TSocket::Close(option);
00347 } else {
00348 for (int i = 0; i < fSize; i++) {
00349 fSockets[i]->Close(option);
00350 delete fSockets[i];
00351 }
00352 }
00353 delete [] fSockets;
00354 fSockets = 0;
00355
00356 {
00357 R__LOCKGUARD2(gROOTMutex);
00358 gROOT->GetListOfSockets()->Remove(this);
00359 }
00360 }
00361
00362
00363 void TPSocket::Init(Int_t tcpwindowsize, TSocket *sock)
00364 {
00365
00366
00367 fSockets = 0;
00368 fWriteMonitor = 0;
00369 fReadMonitor = 0;
00370 fWriteBytesLeft = 0;
00371 fReadBytesLeft = 0;
00372 fWritePtr = 0;
00373 fReadPtr = 0;
00374
00375 if ((sock && !sock->IsValid()) || !TSocket::IsValid())
00376 return;
00377
00378 Int_t i = 0;
00379
00380 if (fSize <= 1) {
00381
00382 fSize = 1;
00383
00384
00385 if (sock)
00386 sock->SetOption(kNoDelay, 1);
00387 else
00388 TSocket::SetOption(kNoDelay, 1);
00389
00390
00391
00392 if (sock)
00393 sock->Send((Int_t)0, (Int_t)0);
00394 else
00395 TSocket::Send((Int_t)0, (Int_t)0);
00396
00397
00398 fSockets = new TSocket*[1];
00399 fSockets[0]= (TSocket *)this;
00400
00401 } else {
00402
00403
00404
00405 TServerSocket ss(0, kFALSE, fSize, tcpwindowsize);
00406
00407
00408
00409 if (sock)
00410 sock->Send(ss.GetLocalPort(), fSize);
00411 else
00412 TSocket::Send(ss.GetLocalPort(), fSize);
00413
00414 fSockets = new TSocket*[fSize];
00415
00416
00417 for (i = 0; i < fSize; i++) {
00418 fSockets[i] = ss.Accept();
00419 R__LOCKGUARD2(gROOTMutex);
00420 gROOT->GetListOfSockets()->Remove(fSockets[i]);
00421 }
00422
00423
00424 SetOption(kNoDelay, 1);
00425 SetOption(kNoBlock, 1);
00426
00427
00428 if (sock)
00429 sock->Close();
00430 else
00431 gSystem->CloseConnection(fSocket, kFALSE);
00432 fSocket = -1;
00433 }
00434
00435 fWriteMonitor = new TMonitor;
00436 fReadMonitor = new TMonitor;
00437 fWriteBytesLeft = new Int_t[fSize];
00438 fReadBytesLeft = new Int_t[fSize];
00439 fWritePtr = new char*[fSize];
00440 fReadPtr = new char*[fSize];
00441
00442 for (i = 0; i < fSize; i++) {
00443 fWriteMonitor->Add(fSockets[i], TMonitor::kWrite);
00444 fReadMonitor->Add(fSockets[i], TMonitor::kRead);
00445 }
00446 fWriteMonitor->DeActivateAll();
00447 fReadMonitor->DeActivateAll();
00448 }
00449
00450
00451 TInetAddress TPSocket::GetLocalInetAddress()
00452 {
00453
00454
00455
00456 if (fSize<= 1)
00457 return TSocket::GetLocalInetAddress();
00458
00459 if (IsValid()) {
00460 if (fLocalAddress.GetPort() == -1)
00461 fLocalAddress = gSystem->GetSockName(fSockets[0]->GetDescriptor());
00462 return fLocalAddress;
00463 }
00464 return TInetAddress();
00465 }
00466
00467
00468 Int_t TPSocket::GetDescriptor() const
00469 {
00470
00471
00472 if (fSize <= 1)
00473 return TSocket::GetDescriptor();
00474
00475 return fSockets ? fSockets[0]->GetDescriptor() : -1;
00476
00477 }
00478
00479
00480 Int_t TPSocket::Send(const TMessage &mess)
00481 {
00482
00483
00484
00485
00486
00487
00488 if (!fSockets || fSize <= 1)
00489 return TSocket::Send(mess);
00490
00491 if (!IsValid()) {
00492 return -1;
00493 }
00494
00495 if (mess.IsReading()) {
00496 Error("Send", "cannot send a message used for reading");
00497 return -1;
00498 }
00499
00500
00501 SendStreamerInfos(mess);
00502
00503
00504 SendProcessIDs(mess);
00505
00506 mess.SetLength();
00507
00508 if (fCompress > 0 && mess.GetCompressionLevel() == 0)
00509 const_cast<TMessage&>(mess).SetCompressionLevel(fCompress);
00510
00511 if (mess.GetCompressionLevel() > 0)
00512 const_cast<TMessage&>(mess).Compress();
00513
00514 char *mbuf = mess.Buffer();
00515 Int_t mlen = mess.Length();
00516 if (mess.CompBuffer()) {
00517 mbuf = mess.CompBuffer();
00518 mlen = mess.CompLength();
00519 }
00520
00521 Int_t nsent, ulen = (Int_t) sizeof(UInt_t);
00522
00523 if ((nsent = SendRaw(mbuf, ulen, kDefault)) <= 0)
00524 return nsent;
00525
00526
00527 if ((nsent = SendRaw(mbuf+ulen, mlen-ulen, kDefault)) <= 0)
00528 return nsent;
00529
00530
00531 if (mess.What() & kMESS_ACK) {
00532 char buf[2];
00533 if (RecvRaw(buf, sizeof(buf), kDefault) < 0)
00534 return -1;
00535 if (strncmp(buf, "ok", 2)) {
00536 Error("Send", "bad acknowledgement");
00537 return -1;
00538 }
00539 }
00540
00541 return nsent;
00542 }
00543
00544
00545 Int_t TPSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
00546 {
00547
00548
00549
00550 if (fSize == 1)
00551 return TSocket::SendRaw(buffer,length,opt);
00552
00553 if (!fSockets) return -1;
00554
00555
00556 Int_t i, nsocks = fSize, len = length;
00557 if (len < 4096)
00558 nsocks = 1;
00559
00560 ESendRecvOptions sendopt = kDontBlock;
00561 if (nsocks == 1)
00562 sendopt = kDefault;
00563
00564 if (opt != kDefault) {
00565 nsocks = 1;
00566 sendopt = opt;
00567 }
00568
00569 if (nsocks == 1)
00570 fSockets[0]->SetOption(kNoBlock, 0);
00571 else
00572 fSockets[0]->SetOption(kNoBlock, 1);
00573
00574
00575
00576 for (i = 0; i < nsocks; i++) {
00577 fWriteBytesLeft[i] = len/nsocks;
00578 fWritePtr[i] = (char *)buffer + (i*fWriteBytesLeft[i]);
00579 fWriteMonitor->Activate(fSockets[i]);
00580 }
00581 fWriteBytesLeft[nsocks-1] += len%nsocks;
00582
00583
00584 while (len > 0) {
00585 TSocket *s = fWriteMonitor->Select();
00586 for (int is = 0; is < nsocks; is++) {
00587 if (s == fSockets[is]) {
00588 if (fWriteBytesLeft[is] > 0) {
00589 Int_t nsent;
00590 again:
00591 if ((nsent = fSockets[is]->SendRaw(fWritePtr[is],
00592 fWriteBytesLeft[is],
00593 sendopt)) <= 0) {
00594 if (nsent == -4) {
00595
00596 goto again;
00597 }
00598 fWriteMonitor->DeActivateAll();
00599 if (nsent == -5) {
00600
00601 Close();
00602 }
00603 return -1;
00604 }
00605 if (opt == kDontBlock) {
00606 fWriteMonitor->DeActivateAll();
00607 return nsent;
00608 }
00609 fWriteBytesLeft[is] -= nsent;
00610 fWritePtr[is] += nsent;
00611 len -= nsent;
00612 }
00613 }
00614 }
00615 }
00616 fWriteMonitor->DeActivateAll();
00617
00618 return length;
00619 }
00620
00621
00622 Int_t TPSocket::Recv(TMessage *&mess)
00623 {
00624
00625
00626
00627
00628
00629 if (fSize <= 1)
00630 return TSocket::Recv(mess);
00631
00632 if (!IsValid()) {
00633 mess = 0;
00634 return -1;
00635 }
00636
00637 oncemore:
00638 Int_t n;
00639 UInt_t len;
00640 if ((n = RecvRaw(&len, sizeof(UInt_t), kDefault)) <= 0) {
00641 mess = 0;
00642 return n;
00643 }
00644 len = net2host(len);
00645
00646 char *buf = new char[len+sizeof(UInt_t)];
00647 if ((n = RecvRaw(buf+sizeof(UInt_t), len, kDefault)) <= 0) {
00648 delete [] buf;
00649 mess = 0;
00650 return n;
00651 }
00652
00653 mess = new TMessage(buf, len+sizeof(UInt_t));
00654
00655
00656 if (RecvStreamerInfos(mess))
00657 goto oncemore;
00658
00659
00660 if (RecvProcessIDs(mess))
00661 goto oncemore;
00662
00663 if (mess->What() & kMESS_ACK) {
00664 char ok[2] = { 'o', 'k' };
00665 if (SendRaw(ok, sizeof(ok), kDefault) < 0) {
00666 delete mess;
00667 mess = 0;
00668 return -1;
00669 }
00670 mess->SetWhat(mess->What() & ~kMESS_ACK);
00671 }
00672
00673 return n;
00674 }
00675
00676
00677 Int_t TPSocket::RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt)
00678 {
00679
00680
00681
00682 if (fSize <= 1)
00683 return TSocket::RecvRaw(buffer,length,opt);
00684
00685 if (!fSockets) return -1;
00686
00687
00688 Int_t i, nsocks = fSize, len = length;
00689 if (len < 4096)
00690 nsocks = 1;
00691
00692 ESendRecvOptions recvopt = kDontBlock;
00693 if (nsocks == 1)
00694 recvopt = kDefault;
00695
00696 if (opt != kDefault) {
00697 nsocks = 1;
00698 recvopt = opt;
00699 }
00700
00701 if (nsocks == 1)
00702 fSockets[0]->SetOption(kNoBlock, 0);
00703 else
00704 fSockets[0]->SetOption(kNoBlock, 1);
00705
00706
00707
00708 for (i = 0; i < nsocks; i++) {
00709 fReadBytesLeft[i] = len/nsocks;
00710 fReadPtr[i] = (char *)buffer + (i*fReadBytesLeft[i]);
00711 fReadMonitor->Activate(fSockets[i]);
00712 }
00713 fReadBytesLeft[nsocks-1] += len%nsocks;
00714
00715
00716
00717
00718 while (len > 0) {
00719 TSocket *s = fReadMonitor->Select();
00720 for (int is = 0; is < nsocks; is++) {
00721 if (s == fSockets[is]) {
00722 if (fReadBytesLeft[is] > 0) {
00723 Int_t nrecv;
00724 if ((nrecv = fSockets[is]->RecvRaw(fReadPtr[is],
00725 fReadBytesLeft[is],
00726 recvopt)) <= 0) {
00727 fReadMonitor->DeActivateAll();
00728 if (nrecv == -5) {
00729
00730 Close();
00731 }
00732 return -1;
00733 }
00734 if (opt == kDontBlock) {
00735 fReadMonitor->DeActivateAll();
00736 return nrecv;
00737 }
00738 fReadBytesLeft[is] -= nrecv;
00739 fReadPtr[is] += nrecv;
00740 len -= nrecv;
00741 }
00742 }
00743 }
00744 }
00745 fReadMonitor->DeActivateAll();
00746
00747 return length;
00748 }
00749
00750
00751 Int_t TPSocket::SetOption(ESockOptions opt, Int_t val)
00752 {
00753
00754
00755 if (fSize <= 1)
00756 return TSocket::SetOption(opt,val);
00757
00758 Int_t ret = 0;
00759 for (int i = 0; i < fSize; i++)
00760 ret = fSockets[i]->SetOption(opt, val);
00761 return ret;
00762 }
00763
00764
00765 Int_t TPSocket::GetOption(ESockOptions opt, Int_t &val)
00766 {
00767
00768
00769 if (fSize <= 1)
00770 return TSocket::GetOption(opt,val);
00771
00772 Int_t ret = 0;
00773 for (int i = 0; i < fSize; i++)
00774 ret = fSockets[i]->GetOption(opt, val);
00775 return ret;
00776 }
00777
00778
00779 Int_t TPSocket::GetErrorCode() const
00780 {
00781
00782
00783
00784 if (fSize <= 1)
00785 return TSocket::GetErrorCode();
00786
00787 return fSockets[0] ? fSockets[0]->GetErrorCode() : 0;
00788 }