00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdLinkCVSID = "$Id: XrdLink.cc 38011 2011-02-08 18:35:57Z ganis $";
00014
00015 #include <poll.h>
00016 #include <signal.h>
00017 #include <stdio.h>
00018 #include <string.h>
00019 #include <sys/types.h>
00020 #include <sys/socket.h>
00021 #include <sys/uio.h>
00022
00023 #ifdef __linux__
00024 #include <netinet/tcp.h>
00025 #if !defined(TCP_CORK)
00026 #undef HAVE_SENDFILE
00027 #endif
00028 #endif
00029
00030 #ifdef HAVE_SENDFILE
00031
00032 #ifndef __macos__
00033 #include <sys/sendfile.h>
00034 #endif
00035
00036 #endif
00037
00038 #include "XrdNet/XrdNetDNS.hh"
00039 #include "XrdNet/XrdNetPeer.hh"
00040 #include "XrdSys/XrdSysError.hh"
00041 #include "XrdSys/XrdSysPlatform.hh"
00042
00043 #include "Xrd/XrdBuffer.hh"
00044 #include "Xrd/XrdLink.hh"
00045 #include "Xrd/XrdInet.hh"
00046 #include "Xrd/XrdPoll.hh"
00047 #include "Xrd/XrdScheduler.hh"
00048 #define TRACELINK this
00049 #include "Xrd/XrdTrace.hh"
00050
00051
00052
00053
00054
00055
00056
00057
00058 class XrdLinkScan : XrdJob
00059 {
00060 public:
00061
00062 void DoIt() {idleScan();}
00063
00064 XrdLinkScan(int im, int it, const char *lt="idle link scan") :
00065 XrdJob(lt)
00066 {idleCheck = im; idleTicks = it;}
00067 ~XrdLinkScan() {}
00068
00069 private:
00070
00071 void idleScan();
00072
00073 int idleCheck;
00074 int idleTicks;
00075
00076 static const char *TraceID;
00077 };
00078
00079
00080
00081
00082
00083 extern XrdSysError XrdLog;
00084
00085 extern XrdScheduler XrdSched;
00086
00087 extern XrdInet *XrdNetTCP;
00088 extern XrdOucTrace XrdTrace;
00089
00090 #if defined(HAVE_SENDFILE)
00091 int XrdLink::sfOK = 1;
00092 #else
00093 int XrdLink::sfOK = 0;
00094 #endif
00095
00096 XrdLink **XrdLink::LinkTab;
00097 char *XrdLink::LinkBat;
00098 unsigned int XrdLink::LinkAlloc;
00099 int XrdLink::LTLast = -1;
00100 XrdSysMutex XrdLink::LTMutex;
00101
00102 const char *XrdLink::TraceID = "Link";
00103
00104 long long XrdLink::LinkBytesIn = 0;
00105 long long XrdLink::LinkBytesOut = 0;
00106 long long XrdLink::LinkConTime = 0;
00107 long long XrdLink::LinkCountTot = 0;
00108 int XrdLink::LinkCount = 0;
00109 int XrdLink::LinkCountMax = 0;
00110 int XrdLink::LinkTimeOuts = 0;
00111 int XrdLink::LinkStalls = 0;
00112 int XrdLink::LinkSfIntr = 0;
00113 XrdSysMutex XrdLink::statsMutex;
00114
00115 const char *XrdLinkScan::TraceID = "LinkScan";
00116 int XrdLink::devNull = open("/dev/null", O_RDONLY);
00117 short XrdLink::killWait= 3;
00118 short XrdLink::waitKill= 4;
00119
00120
00121
00122 #define XRDLINK_FREE 0x00
00123 #define XRDLINK_USED 0x01
00124 #define XRDLINK_IDLE 0x02
00125
00126
00127
00128
00129
00130 XrdLink::XrdLink() : XrdJob("connection"), IOSemaphore(0, "link i/o")
00131 {
00132 Etext = 0;
00133 HostName = 0;
00134 Reset();
00135 }
00136
00137 void XrdLink::Reset()
00138 {
00139 FD = -1;
00140 if (Etext) {free(Etext); Etext = 0;}
00141 if (HostName) {free(HostName); HostName = 0;}
00142 Uname[sizeof(Uname)-1] = '@';
00143 Uname[sizeof(Uname)-2] = '?';
00144 Lname[0] = '?';
00145 Lname[1] = '\0';
00146 ID = &Uname[sizeof(Uname)-2];
00147 Comment = ID;
00148 Next = 0;
00149 Protocol = 0;
00150 ProtoAlt = 0;
00151 conTime = time(0);
00152 stallCnt = stallCntTot = 0;
00153 tardyCnt = tardyCntTot = 0;
00154 InUse = 1;
00155 Poller = 0;
00156 PollEnt = 0;
00157 isEnabled= 0;
00158 isIdle = 0;
00159 inQ = 0;
00160 tBound = 0;
00161 BytesOut = BytesIn = BytesOutTot = BytesInTot = 0;
00162 doPost = 0;
00163 LockReads= 0;
00164 KeepFD = 0;
00165 udpbuff = 0;
00166 Instance = 0;
00167 KillcvP = 0;
00168 KillCnt = 0;
00169 }
00170
00171
00172
00173
00174
00175 XrdLink *XrdLink::Alloc(XrdNetPeer &Peer, int opts)
00176 {
00177 static XrdSysMutex instMutex;
00178 static unsigned int myInstance = 1;
00179 XrdLink *lp;
00180 char *unp, buff[16];
00181 int bl;
00182
00183
00184
00185 LTMutex.Lock();
00186 if (LinkBat[Peer.fd])
00187 {LTMutex.UnLock();
00188 XrdLog.Emsg("Link", "attempt to reuse active link");
00189 return (XrdLink *)0;
00190 }
00191
00192
00193
00194
00195 if (!(lp = LinkTab[Peer.fd]))
00196 {unsigned int i;
00197 XrdLink **blp, *nlp = new XrdLink[LinkAlloc]();
00198 if (!nlp)
00199 {LTMutex.UnLock();
00200 XrdLog.Emsg("Link", ENOMEM, "create link");
00201 return (XrdLink *)0;
00202 }
00203 blp = &LinkTab[Peer.fd/LinkAlloc*LinkAlloc];
00204 for (i = 0; i < LinkAlloc; i++, blp++) *blp = &nlp[i];
00205 lp = LinkTab[Peer.fd];
00206 }
00207 else lp->Reset();
00208 LinkBat[Peer.fd] = XRDLINK_USED;
00209 if (Peer.fd > LTLast) LTLast = Peer.fd;
00210 LTMutex.UnLock();
00211
00212
00213
00214
00215
00216 instMutex.Lock();
00217 lp->Instance = myInstance++;
00218 instMutex.UnLock();
00219
00220
00221
00222 memcpy((void *)&(lp->InetAddr), (const void *)&Peer.InetAddr,
00223 sizeof(struct sockaddr));
00224 if (Peer.InetName) strlcpy(lp->Lname, Peer.InetName, sizeof(lp->Lname));
00225 else {char *host = XrdNetDNS::getHostName(Peer.InetAddr);
00226 strlcpy(lp->Lname, host, sizeof(lp->Lname));
00227 free(host);
00228 }
00229 lp->HostName = strdup(lp->Lname);
00230 lp->HNlen = strlen(lp->HostName);
00231 XrdNetTCP->Trim(lp->Lname);
00232 bl = sprintf(buff, "?:%d", Peer.fd);
00233 unp = lp->Lname - bl - 1;
00234 strncpy(unp, buff, bl);
00235 lp->ID = unp;
00236 lp->FD = Peer.fd;
00237 lp->udpbuff = Peer.InetBuff;
00238 lp->Comment = (const char *)unp;
00239
00240
00241
00242 lp->LockReads = (0 != (opts & XRDLINK_RDLOCK));
00243 lp->KeepFD = (0 != (opts & XRDLINK_NOCLOSE));
00244
00245
00246
00247 statsMutex.Lock();
00248 LinkCountTot++;
00249 if (LinkCountMax == LinkCount++) LinkCountMax = LinkCount;
00250 statsMutex.UnLock();
00251 return lp;
00252 }
00253
00254
00255
00256
00257
00258 void XrdLink::Bind(pthread_t tid)
00259 {
00260
00261
00262
00263 TID = tid;
00264 tBound = 1;
00265 }
00266
00267
00268
00269 void XrdLink::Bind()
00270 {
00271 #ifdef __linux__
00272 pthread_t curTID = (tBound ? TID : XrdSysThread::ID());
00273 #endif
00274
00275
00276
00277
00278 if (tBound)
00279 {tBound = 0;
00280 #ifdef __linux__
00281 if (!XrdSysThread::Same(curTID, XrdSysThread::ID()))
00282 {XrdSysThread::Signal(curTID, SIGSTOP);
00283 XrdSysThread::Signal(curTID, SIGCONT);
00284 }
00285 #endif
00286 }
00287 }
00288
00289
00290
00291
00292
00293 int XrdLink::Client(char *nbuf, int nbsz)
00294 {
00295 int ulen;
00296
00297
00298
00299 if (nbsz <= 0) return 0;
00300 ulen = (Lname - ID);
00301 if ((ulen + HNlen) >= nbsz) ulen = 0;
00302 else {strncpy(nbuf, ID, ulen);
00303 strcpy(nbuf+ulen, HostName);
00304 ulen += HNlen;
00305 }
00306 return ulen;
00307 }
00308
00309
00310
00311
00312
00313 int XrdLink::Close(int defer)
00314 { int csec, fd, rc = 0;
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327 opMutex.Lock();
00328 if (defer)
00329 {TRACEI(DEBUG, "Closing FD only");
00330 if (FD > 1)
00331 {fd = FD; FD = -FD; csec = Instance; Instance = 0;
00332 if (!KeepFD)
00333 {shutdown(fd, SHUT_RDWR);
00334 if (dup2(devNull, fd) < 0)
00335 {FD = fd; Instance = csec;
00336 XrdLog.Emsg("Link",errno,"close FD for",ID);
00337 } else Bind();
00338 }
00339 }
00340 opMutex.UnLock();
00341 return 0;
00342 }
00343
00344
00345
00346
00347 while(InUse > 1)
00348 {opMutex.UnLock();
00349 TRACEI(DEBUG, "Close defered, use count=" <<InUse);
00350 Serialize();
00351 opMutex.Lock();
00352 }
00353 InUse--;
00354 Instance = 0;
00355
00356
00357
00358 syncStats(&csec);
00359
00360
00361
00362 if (Protocol) {Protocol->Recycle(this, csec, Etext); Protocol = 0;}
00363 if (ProtoAlt) {ProtoAlt->Recycle(this, csec, Etext); ProtoAlt = 0;}
00364 if (udpbuff) {udpbuff->Recycle(); udpbuff = 0;}
00365 if (Etext) {free(Etext); Etext = 0;}
00366 InUse = 0;
00367
00368
00369
00370
00371
00372
00373 if (KillcvP) {KillcvP-> Lock(); KillcvP->Signal();
00374 KillcvP->UnLock(); KillcvP = 0;
00375 }
00376
00377
00378
00379
00380
00381
00382 fd = (FD < 0 ? -FD : FD);
00383 if (FD != -1)
00384 {if (Poller) {XrdPoll::Detach(this); Poller = 0;}
00385 FD = -1;
00386 opMutex.UnLock();
00387 LTMutex.Lock();
00388 LinkBat[fd] = XRDLINK_FREE;
00389 if (fd == LTLast) while(LTLast && !(LinkBat[LTLast])) LTLast--;
00390 LTMutex.UnLock();
00391 } else opMutex.UnLock();
00392
00393
00394
00395
00396 if (fd >= 2) {if (KeepFD) rc = 0;
00397 else rc = (close(fd) < 0 ? errno : 0);
00398 }
00399 if (rc) XrdLog.Emsg("Link", rc, "close", ID);
00400 return rc;
00401 }
00402
00403
00404
00405
00406
00407 void XrdLink::DoIt()
00408 {
00409 int rc;
00410
00411
00412
00413
00414
00415
00416
00417
00418 if (Protocol)
00419 do {rc = Protocol->Process(this);} while (!rc && XrdSched.canStick());
00420 else {XrdLog.Emsg("Link", "Dispatch on closed link", ID);
00421 return;
00422 }
00423
00424
00425
00426
00427 if (rc >= 0) {if (Poller) Poller->Enable(this);}
00428 else if (rc != -EINPROGRESS) Close();
00429 }
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440 XrdLink *XrdLink::Find(int &curr, XrdLinkMatch *who)
00441 {
00442 XrdLink *lp;
00443 const int MaxSeek = 16;
00444 unsigned int myINS;
00445 int i, seeklim = MaxSeek;
00446
00447
00448
00449 LTMutex.Lock();
00450 if (curr >= 0 && LinkTab[curr]) LinkTab[curr]->setRef(-1);
00451 else curr = -1;
00452
00453
00454
00455
00456
00457 for (i = curr+1; i <= LTLast; i++)
00458 {if ((lp = LinkTab[i]) && LinkBat[i] && lp->HostName)
00459 if (!who
00460 || who->Match(lp->ID,lp->Lname-lp->ID-1,lp->HostName,lp->HNlen))
00461 {myINS = lp->Instance;
00462 LTMutex.UnLock();
00463 lp->setRef(1);
00464 curr = i;
00465 if (myINS == lp->Instance) return lp;
00466 LTMutex.Lock();
00467 }
00468 if (!seeklim--) {LTMutex.UnLock(); seeklim = MaxSeek; LTMutex.Lock();}
00469 }
00470
00471
00472
00473 LTMutex.UnLock();
00474 curr = -1;
00475 return 0;
00476 }
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486 int XrdLink::getName(int &curr, char *nbuf, int nbsz, XrdLinkMatch *who)
00487 {
00488 XrdLink *lp;
00489 const int MaxSeek = 16;
00490 int i, ulen = 0, seeklim = MaxSeek;
00491
00492
00493
00494
00495
00496 LTMutex.Lock();
00497 for (i = curr+1; i <= LTLast; i++)
00498 {if ((lp = LinkTab[i]) && LinkBat[i] && lp->HostName)
00499 if (!who
00500 || who->Match(lp->ID,lp->Lname-lp->ID-1,lp->HostName,lp->HNlen))
00501 {ulen = lp->Client(nbuf, nbsz);
00502 LTMutex.UnLock();
00503 curr = i;
00504 return ulen;
00505 }
00506 if (!seeklim--) {LTMutex.UnLock(); seeklim = MaxSeek; LTMutex.Lock();}
00507 }
00508 LTMutex.UnLock();
00509
00510
00511
00512 curr = -1;
00513 return 0;
00514 }
00515
00516
00517
00518
00519
00520 int XrdLink::Peek(char *Buff, int Blen, int timeout)
00521 {
00522 XrdSysMutexHelper theMutex;
00523 struct pollfd polltab = {FD, POLLIN|POLLRDNORM, 0};
00524 ssize_t mlen;
00525 int retc;
00526
00527
00528
00529 if (LockReads) theMutex.Lock(&rdMutex);
00530
00531
00532
00533 isIdle = 0;
00534 do {retc = poll(&polltab, 1, timeout);} while(retc < 0 && errno == EINTR);
00535 if (retc != 1)
00536 {if (retc == 0) return 0;
00537 return XrdLog.Emsg("Link", -errno, "poll", ID);
00538 }
00539
00540
00541
00542 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
00543 {XrdLog.Emsg("Link", XrdPoll::Poll2Text(polltab.revents),
00544 "polling", ID);
00545 return -1;
00546 }
00547
00548
00549
00550 do {mlen = recv(FD, Buff, Blen, MSG_PEEK);}
00551 while(mlen < 0 && errno == EINTR);
00552
00553
00554
00555 if (mlen >= 0) return int(mlen);
00556 XrdLog.Emsg("Link", errno, "peek on", ID);
00557 return -1;
00558 }
00559
00560
00561
00562
00563
00564 int XrdLink::Recv(char *Buff, int Blen)
00565 {
00566 ssize_t rlen;
00567
00568
00569
00570
00571 if (LockReads) rdMutex.Lock();
00572 isIdle = 0;
00573 do {rlen = read(FD, Buff, Blen);} while(rlen < 0 && errno == EINTR);
00574 if (LockReads) rdMutex.UnLock();
00575
00576 if (rlen >= 0) return int(rlen);
00577 if (FD >= 0) XrdLog.Emsg("Link", errno, "receive from", ID);
00578 return -1;
00579 }
00580
00581
00582
00583 int XrdLink::Recv(char *Buff, int Blen, int timeout)
00584 {
00585 XrdSysMutexHelper theMutex;
00586 struct pollfd polltab = {FD, POLLIN|POLLRDNORM, 0};
00587 ssize_t rlen, totlen = 0;
00588 int retc;
00589
00590
00591
00592 if (LockReads) theMutex.Lock(&rdMutex);
00593
00594
00595
00596 isIdle = 0;
00597 while(Blen > 0)
00598 {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
00599 if (retc != 1)
00600 {if (retc == 0)
00601 {tardyCnt++;
00602 if (totlen && (++stallCnt & 0xff) == 1)
00603 TRACEI(DEBUG, "read timed out");
00604 return int(totlen);
00605 }
00606 return (FD >= 0 ? XrdLog.Emsg("Link", -errno, "poll", ID) : -1);
00607 }
00608
00609
00610
00611 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
00612 {XrdLog.Emsg("Link", XrdPoll::Poll2Text(polltab.revents),
00613 "polling", ID);
00614 return -1;
00615 }
00616
00617
00618
00619
00620 do {rlen = recv(FD, Buff, Blen, 0);} while(rlen < 0 && errno == EINTR);
00621 if (rlen <= 0)
00622 {if (!rlen) return -ENOMSG;
00623 return (FD<0 ? -1 : XrdLog.Emsg("Link",-errno,"receive from",ID));
00624 }
00625 BytesIn += rlen; totlen += rlen; Blen -= rlen; Buff += rlen;
00626 }
00627
00628 return int(totlen);
00629 }
00630
00631
00632
00633
00634
00635
00636 int XrdLink::RecvAll(char *Buff, int Blen, int timeout)
00637 {
00638 struct pollfd polltab = {FD, POLLIN|POLLRDNORM, 0};
00639 ssize_t rlen;
00640 int retc;
00641
00642
00643
00644
00645 if (timeout >= 0)
00646 {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
00647 if (retc != 1)
00648 {if (!retc) return -ETIMEDOUT;
00649 XrdLog.Emsg("Link",errno,"poll",ID);
00650 return -1;
00651 }
00652 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
00653 {XrdLog.Emsg("Link",XrdPoll::Poll2Text(polltab.revents),"polling",ID);
00654 return -1;
00655 }
00656 }
00657
00658
00659
00660 if (LockReads) rdMutex.Lock();
00661 isIdle = 0;
00662 do {rlen = recv(FD,Buff,Blen,MSG_WAITALL);} while(rlen < 0 && errno == EINTR);
00663 if (LockReads) rdMutex.UnLock();
00664
00665 if (int(rlen) == Blen) return Blen;
00666 if (!rlen) {TRACEI(DEBUG, "No RecvAll() data; errno=" <<errno);}
00667 else if (rlen > 0) XrdLog.Emsg("RecvAll","Premature end from", ID);
00668 else if (FD >= 0) XrdLog.Emsg("Link",errno,"recieve from",ID);
00669 return -1;
00670 }
00671
00672
00673
00674
00675
00676 int XrdLink::Send(const char *Buff, int Blen)
00677 {
00678 ssize_t retc = 0, bytesleft = Blen;
00679
00680
00681
00682 wrMutex.Lock();
00683 isIdle = 0;
00684
00685
00686
00687 while(bytesleft)
00688 {if ((retc = write(FD, Buff, bytesleft)) < 0)
00689 {if (errno == EINTR) continue;
00690 else break;
00691 }
00692 BytesOut += retc; bytesleft -= retc; Buff += retc;
00693 }
00694
00695
00696
00697 wrMutex.UnLock();
00698 if (retc >= 0) return Blen;
00699 XrdLog.Emsg("Link", errno, "send to", ID);
00700 return -1;
00701 }
00702
00703
00704
00705 int XrdLink::Send(const struct iovec *iov, int iocnt, int bytes)
00706 {
00707 ssize_t bytesleft, n, retc = 0;
00708 const char *Buff;
00709 int i;
00710
00711
00712
00713 if (!bytes) for (i = 0; i < iocnt; i++) bytes += iov[i].iov_len;
00714 bytesleft = static_cast<ssize_t>(bytes);
00715
00716
00717
00718 wrMutex.Lock();
00719 isIdle = 0;
00720 BytesOut += bytes;
00721
00722
00723
00724
00725
00726
00727
00728
00729 while(bytesleft)
00730 {do {retc = writev(FD, iov, iocnt);} while(retc < 0 && errno == EINTR);
00731 if (retc >= bytesleft || retc < 0) break;
00732 bytesleft -= retc;
00733 while(retc >= (n = static_cast<ssize_t>(iov->iov_len)))
00734 {retc -= n; iov++; iocnt--;}
00735 Buff = (const char *)iov->iov_base + retc; n -= retc; iov++; iocnt--;
00736 while(n) {if ((retc = write(FD, Buff, n)) < 0)
00737 {if (errno == EINTR) continue;
00738 else break;
00739 }
00740 n -= retc; Buff += retc;
00741 }
00742 if (retc < 0 || iocnt < 1) break;
00743 }
00744
00745
00746
00747 wrMutex.UnLock();
00748 if (retc >= 0) return bytes;
00749 XrdLog.Emsg("Link", errno, "send to", ID);
00750 return -1;
00751 }
00752
00753
00754 int XrdLink::Send(const struct sfVec *sfP, int sfN)
00755 {
00756 #if !defined(HAVE_SENDFILE)
00757 return -1;
00758 #else
00759
00760
00761 if (sfN < 1 || sfN > sfMax)
00762 {XrdLog.Emsg("Link", EINVAL, "send file to", ID);
00763 return -1;
00764 }
00765
00766 #ifdef __solaris__
00767 sendfilevec_t vecSF[sfMax], *vecSFP = vecSF;
00768 size_t xframt, totamt, bytes = 0;
00769 ssize_t retc;
00770 int i = 0;
00771
00772
00773
00774 for (i = 0; i < sfN; sfP++, i++)
00775 {if (sfP->fdnum < 0)
00776 {vecSF[i].sfv_fd = SFV_FD_SELF;
00777 vecSF[i].sfv_off = (off_t)sfP->buffer;
00778 } else {
00779 vecSF[i].sfv_fd = sfP->fdnum;
00780 vecSF[i].sfv_off = sfP->offset;
00781 }
00782 vecSF[i].sfv_flag = 0;
00783 vecSF[i].sfv_len = sfP->sendsz;
00784 bytes += sfP->sendsz;
00785 }
00786 totamt = bytes;
00787
00788
00789
00790
00791
00792 wrMutex.Lock();
00793 isIdle = 0;
00794 do{retc = sendfilev(FD, vecSFP, sfN, &xframt);
00795
00796
00797
00798 if (xframt == bytes)
00799 {BytesOut += bytes;
00800 wrMutex.UnLock();
00801 return totamt;
00802 }
00803
00804
00805
00806 if (retc < 0 && errno != EINTR) break;
00807
00808
00809
00810 if (xframt > 0)
00811 {BytesOut += xframt; bytes -= xframt; SfIntr++;
00812 while(xframt > 0 && sfN)
00813 {if ((ssize_t)xframt < (ssize_t)vecSFP->sfv_len)
00814 {vecSFP->sfv_off += xframt; vecSFP->sfv_len -= xframt; break;}
00815 xframt -= vecSFP->sfv_len; vecSFP++; sfN--;
00816 }
00817 }
00818 } while(sfN > 0);
00819
00820
00821
00822 retc = (retc < 0 ? errno : ECANCELED);
00823 wrMutex.UnLock();
00824 XrdLog.Emsg("Link", retc, "send file to", ID);
00825 return -1;
00826
00827 #elif defined(__linux__)
00828
00829 static const int setON = 1, setOFF = 0;
00830 ssize_t retc = 0, bytesleft;
00831 off_t myOffset;
00832 int i, xfrbytes = 0, uncork = 1, xIntr = 0;
00833
00834
00835
00836 wrMutex.Lock();
00837 isIdle = 0;
00838
00839
00840
00841
00842 if (setsockopt(FD, SOL_TCP, TCP_CORK, &setON, sizeof(setON)) < 0)
00843 {XrdLog.Emsg("Link", errno, "cork socket for", ID);
00844 uncork = 0; sfOK = 0;
00845 }
00846
00847
00848
00849 for (i = 0; i < sfN; sfP++, i++)
00850 {if (sfP->fdnum < 0) retc = sendData(sfP->buffer, sfP->sendsz);
00851 else {myOffset = sfP->offset; bytesleft = sfP->sendsz;
00852 while(bytesleft
00853 && (retc=sendfile(FD,sfP->fdnum,&myOffset,bytesleft)) > 0)
00854 {myOffset += retc; bytesleft -= retc; xIntr++;}
00855 }
00856 if (retc < 0 && errno == EINTR) continue;
00857 if (retc <= 0) break;
00858 xfrbytes += sfP->sendsz;
00859 }
00860
00861
00862
00863 if (retc <= 0)
00864 {if (retc == 0) errno = ECANCELED;
00865 wrMutex.UnLock();
00866 XrdLog.Emsg("Link", errno, "send file to", ID);
00867 return -1;
00868 }
00869
00870
00871
00872 if (uncork && setsockopt(FD, SOL_TCP, TCP_CORK, &setOFF, sizeof(setOFF)) < 0)
00873 XrdLog.Emsg("Link", errno, "uncork socket for", ID);
00874
00875
00876
00877 if (xIntr > sfN) SfIntr += (xIntr - sfN);
00878 BytesOut += xfrbytes;
00879 wrMutex.UnLock();
00880 return xfrbytes;
00881 #endif
00882 #endif
00883 }
00884
00885
00886
00887
00888
00889 int XrdLink::sendData(const char *Buff, int Blen)
00890 {
00891 ssize_t retc = 0, bytesleft = Blen;
00892
00893
00894
00895 while(bytesleft)
00896 {if ((retc = write(FD, Buff, bytesleft)) < 0)
00897 {if (errno == EINTR) continue;
00898 else break;
00899 }
00900 bytesleft -= retc; Buff += retc;
00901 }
00902
00903
00904
00905 return retc;
00906 }
00907
00908
00909
00910
00911
00912 int XrdLink::setEtext(const char *text)
00913 {
00914 opMutex.Lock();
00915 if (Etext) free(Etext);
00916 Etext = (text ? strdup(text) : 0);
00917 opMutex.UnLock();
00918 return -1;
00919 }
00920
00921
00922
00923
00924
00925 void XrdLink::setID(const char *userid, int procid)
00926 {
00927 char buff[sizeof(Uname)], *bp, *sp;
00928 int ulen;
00929
00930 snprintf(buff, sizeof(buff), "%s.%d:%d", userid, procid, FD);
00931 ulen = strlen(buff);
00932 sp = buff + ulen - 1;
00933 bp = &Uname[sizeof(Uname)-1];
00934 if (ulen > (int)sizeof(Uname)) ulen = sizeof(Uname);
00935 *bp = '@'; bp--;
00936 while(ulen--) {*bp = *sp; bp--; sp--;}
00937 ID = bp+1;
00938 Comment = (const char *)ID;
00939 }
00940
00941
00942
00943
00944
00945 int XrdLink::Setup(int maxfds, int idlewait)
00946 {
00947 int numalloc, iticks, ichk;
00948
00949
00950
00951 fcntl(devNull, F_SETFD, FD_CLOEXEC);
00952
00953
00954
00955
00956 numalloc = 8192 / sizeof(XrdLink);
00957 LinkAlloc = 1;
00958 while((numalloc = numalloc/2)) LinkAlloc = LinkAlloc*2;
00959 TRACE(DEBUG, "Allocating " <<LinkAlloc <<" link objects at a time");
00960
00961
00962
00963 if (!(LinkTab = (XrdLink **)malloc(maxfds*sizeof(XrdLink *)+LinkAlloc)))
00964 {XrdLog.Emsg("Link", ENOMEM, "create LinkTab"); return 0;}
00965 memset((void *)LinkTab, 0, maxfds*sizeof(XrdLink *));
00966
00967
00968
00969 if (!(LinkBat = (char *)malloc(maxfds*sizeof(char)+LinkAlloc)))
00970 {XrdLog.Emsg("Link", ENOMEM, "create LinkBat"); return 0;}
00971 memset((void *)LinkBat, XRDLINK_FREE, maxfds*sizeof(char));
00972
00973
00974
00975 if (idlewait)
00976 {if (!(ichk = idlewait/3)) {iticks = 1; ichk = idlewait;}
00977 else iticks = 3;
00978 XrdLinkScan *ls = new XrdLinkScan(ichk, iticks);
00979 XrdSched.Schedule((XrdJob *)ls, ichk+time(0));
00980 }
00981
00982 return 1;
00983 }
00984
00985
00986
00987
00988
00989 void XrdLink::Serialize()
00990 {
00991
00992
00993
00994
00995
00996 opMutex.Lock();
00997 if (InUse <= 1) opMutex.UnLock();
00998 else {doPost++;
00999 opMutex.UnLock();
01000 TRACEI(DEBUG, "Waiting for link serialization; use=" <<InUse);
01001 IOSemaphore.Wait();
01002 }
01003 }
01004
01005
01006
01007
01008
01009 void XrdLink::setKWT(int wkSec, int kwSec)
01010 {
01011 if (wkSec > 0) waitKill = static_cast<short>(wkSec);
01012 if (kwSec > 0) killWait = static_cast<short>(kwSec);
01013 }
01014
01015
01016
01017
01018
01019 XrdProtocol *XrdLink::setProtocol(XrdProtocol *pp)
01020 {
01021
01022
01023
01024 opMutex.Lock();
01025 XrdProtocol *op = Protocol;
01026 Protocol = pp;
01027 opMutex.UnLock();
01028 return op;
01029 }
01030
01031
01032
01033
01034
01035 void XrdLink::setRef(int use)
01036 {
01037 opMutex.Lock();
01038 TRACEI(DEBUG,"Setting ref to " <<InUse <<'+' <<use <<" post=" <<doPost);
01039 InUse += use;
01040
01041 if (!InUse)
01042 {InUse = 1; opMutex.UnLock();
01043 XrdLog.Emsg("Link", "Zero use count for", ID);
01044 }
01045 else if (InUse == 1 && doPost)
01046 {doPost--;
01047 IOSemaphore.Post();
01048 TRACEI(CONN, "setRef posted link");
01049 opMutex.UnLock();
01050 }
01051 else if (InUse < 0)
01052 {InUse = 1;
01053 opMutex.UnLock();
01054 XrdLog.Emsg("Link", "Negative use count for", ID);
01055 }
01056 else opMutex.UnLock();
01057 }
01058
01059
01060
01061
01062
01063 int XrdLink::Stats(char *buff, int blen, int do_sync)
01064 {
01065 static const char statfmt[] = "<stats id=\"link\"><num>%d</num>"
01066 "<maxn>%d</maxn><tot>%lld</tot><in>%lld</in><out>%lld</out>"
01067 "<ctime>%lld</ctime><tmo>%d</tmo><stall>%d</stall>"
01068 "<sfps>%d</sfps></stats>";
01069 int i, myLTLast;
01070
01071
01072
01073 if (!buff) return sizeof(statfmt)+17*6;
01074
01075
01076
01077 if (do_sync)
01078 {LTMutex.Lock(); myLTLast = LTLast; LTMutex.UnLock();
01079 for (i = 0; i <= myLTLast; i++)
01080 if (LinkBat[i] == XRDLINK_USED && LinkTab[i])
01081 LinkTab[i]->syncStats();
01082 }
01083
01084
01085
01086 statsMutex.Lock();
01087 i = snprintf(buff, blen, statfmt, LinkCount, LinkCountMax, LinkCountTot,
01088 LinkBytesIn, LinkBytesOut, LinkConTime,
01089 LinkTimeOuts,LinkStalls, LinkSfIntr);
01090 statsMutex.UnLock();
01091 return i;
01092 }
01093
01094
01095
01096
01097
01098 void XrdLink::syncStats(int *ctime)
01099 {
01100
01101
01102
01103 if (!ctime) opMutex.Lock();
01104
01105
01106
01107
01108 statsMutex.Lock();
01109 rdMutex.Lock();
01110 LinkBytesIn += BytesIn; BytesInTot += BytesIn; BytesIn = 0;
01111 LinkTimeOuts += tardyCnt; tardyCntTot += tardyCnt; tardyCnt = 0;
01112 LinkStalls += stallCnt; stallCntTot += stallCnt; stallCnt = 0;
01113 rdMutex.UnLock();
01114 wrMutex.Lock();
01115 LinkBytesOut += BytesOut; BytesOutTot += BytesOut;BytesOut = 0;
01116 LinkSfIntr += SfIntr; SfIntr = 0;
01117 wrMutex.UnLock();
01118 if (ctime)
01119 {*ctime = time(0) - conTime;
01120 LinkConTime += *ctime;
01121 if (!(LinkCount--)) LinkCount = 0;
01122 }
01123 statsMutex.UnLock();
01124
01125
01126
01127 if (Protocol) Protocol->Stats(0, 0, 1);
01128
01129
01130
01131 if (!ctime) opMutex.UnLock();
01132 }
01133
01134
01135
01136
01137
01138 int XrdLink::Terminate(const XrdLink *owner, int fdnum, unsigned int inst)
01139 {
01140 XrdSysCondVar killDone(0);
01141 XrdLink *lp;
01142 char buff[1024], *cp;
01143 int wTime, didKW = KillCnt & KillXwt;
01144
01145
01146
01147 KillCnt = KillCnt & KillMsk;
01148 if (!(lp = fd2link(fdnum, inst))) return (didKW ? -EPIPE : -ESRCH);
01149
01150
01151
01152 if (lp == owner) return 0;
01153
01154
01155
01156 lp->Serialize();
01157 lp->opMutex.Lock();
01158
01159
01160
01161
01162 if ( lp->FD != fdnum || lp->Instance != inst
01163 || !(lp->Poller) || !(lp->Protocol))
01164 {lp->opMutex.UnLock();
01165 return -EPIPE;
01166 }
01167
01168
01169
01170 if (owner
01171 && (!(cp = index(owner->ID, ':'))
01172 || strncmp(lp->ID, owner->ID, cp-(owner->ID))
01173 || strcmp(owner->Lname, lp->Lname)))
01174 {lp->opMutex.UnLock();
01175 return -EACCES;
01176 }
01177
01178
01179
01180 if (lp->KillCnt > KillMax)
01181 {lp->opMutex.UnLock();
01182 return -ETIME;
01183 }
01184 wTime = lp->KillCnt++;
01185
01186
01187
01188
01189 if (!(lp->isEnabled) || lp->InUse > 1 || lp->KillcvP)
01190 {wTime = wTime*2+waitKill;
01191 KillCnt |= KillXwt;
01192 lp->opMutex.UnLock();
01193 return (wTime > 60 ? 60: wTime);
01194 }
01195
01196
01197
01198 lp->KillcvP = &killDone;
01199 killDone.Lock();
01200
01201
01202
01203 snprintf(buff, sizeof(buff), "ended by %s", ID);
01204 buff[sizeof(buff)-1] = '\0';
01205 lp->Poller->Disable(lp, buff);
01206 lp->opMutex.UnLock();
01207
01208
01209
01210 if (killDone.Wait(int(killWait))) {wTime += killWait; KillCnt |= KillXwt;}
01211 else wTime = -EPIPE;
01212 killDone.UnLock();
01213
01214
01215
01216
01217
01218
01219 lp->opMutex.Lock(); lp->KillcvP = 0; lp->opMutex.UnLock();
01220
01221
01222
01223 TRACEI(DEBUG,"Terminate " << (wTime <= 0 ? "complete ":"timeout ") <<wTime);
01224 return wTime;
01225 }
01226
01227
01228
01229
01230
01231 #undef TRACELINK
01232 #define TRACELINK lp
01233
01234 void XrdLinkScan::idleScan()
01235 {
01236 XrdLink *lp;
01237 int i, ltlast, lnum = 0, tmo = 0, tmod = 0;
01238
01239
01240
01241 XrdLink::LTMutex.Lock();
01242 ltlast = XrdLink::LTLast;
01243 XrdLink::LTMutex.UnLock();
01244
01245
01246
01247
01248 for (i = 0; i <= ltlast; i++)
01249 {if (XrdLink::LinkBat[i] != XRDLINK_USED
01250 || !(lp = XrdLink::LinkTab[i])) continue;
01251 lnum++;
01252 lp->opMutex.Lock();
01253 if (lp->isIdle) tmo++;
01254 lp->isIdle++;
01255 if ((int(lp->isIdle)) < idleTicks) {lp->opMutex.UnLock(); continue;}
01256 lp->isIdle = 0;
01257 if (!(lp->Poller) || !(lp->isEnabled))
01258 XrdLog.Emsg("LinkScan","Link",lp->ID,"is disabled and idle.");
01259 else if (lp->InUse == 1)
01260 {lp->Poller->Disable(lp, "idle timeout");
01261 tmod++;
01262 }
01263 lp->opMutex.UnLock();
01264 }
01265
01266
01267
01268 TRACE(CONN, lnum <<" links; " <<tmo <<" idle; " <<tmod <<" force closed");
01269
01270
01271
01272 XrdSched.Schedule((XrdJob *)this, idleCheck+time(0));
01273 }