00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 const char *XrdClientConnMgrCVSID = "$Id: XrdClientConnMgr.cc 30949 2009-11-02 16:37:58Z ganis $";
00023
00024 #include "XrdClient/XrdClientConnMgr.hh"
00025 #include "XrdClient/XrdClientDebug.hh"
00026 #include "XrdClient/XrdClientMessage.hh"
00027 #include "XrdClient/XrdClientLogConnection.hh"
00028 #include "XrdClient/XrdClientThread.hh"
00029 #include "XrdClient/XrdClientEnv.hh"
00030 #include "XrdClient/XrdClientSid.hh"
00031 #ifdef WIN32
00032 #include "XrdSys/XrdWin32.hh"
00033 #endif
00034
00035 #include <assert.h>
00036
00037 #ifdef AIX
00038 #include <sys/sem.h>
00039 #else
00040 #include <semaphore.h>
00041 #endif
00042
00043
00044 #ifndef WIN32
00045 #include <sys/types.h>
00046 #include <pwd.h>
00047 #endif
00048
00049
00050 #define XRC_MAXVECTSIZE 32767
00051
00052
00053 void * GarbageCollectorThread(void *arg, XrdClientThread *thr)
00054 {
00055
00056
00057
00058 if (thr->MaskSignal(0) != 0)
00059 Error("GarbageCollectorThread", "Warning: problems masking signals");
00060
00061 XrdClientConnectionMgr *thisObj = (XrdClientConnectionMgr *)arg;
00062
00063 thr->SetCancelDeferred();
00064 thr->SetCancelOn();
00065
00066 while (1) {
00067 thr->CancelPoint();
00068
00069 thisObj->GarbageCollect();
00070
00071 thr->CancelPoint();
00072
00073 sleep(30);
00074
00075 }
00076
00077 return 0;
00078 }
00079
00080
00081 int DisconnectElapsedPhyConn(const char *key,
00082 XrdClientPhyConnection *p, void *voidcmgr)
00083 {
00084
00085
00086
00087 XrdClientConnectionMgr *cmgr = (XrdClientConnectionMgr *)voidcmgr;
00088 assert(cmgr != 0);
00089
00090 if (p) {
00091 if ((p->GetLogConnCnt() <= 0) &&
00092 p->ExpiredTTL() && p->IsValid()) {
00093 p->Touch();
00094 p->Disconnect();
00095 }
00096
00097 if (!p->IsValid()) {
00098
00099
00100 p->Touch();
00101 p->Disconnect();
00102
00103
00104 cmgr->fPhyTrash.Push_back(p);
00105
00106
00107 return -1;
00108 }
00109 }
00110
00111
00112 return 0;
00113 }
00114
00115
00116
00117
00118 int DumpPhyConn(const char *key,
00119 XrdClientPhyConnection *p, void *voidcmgr)
00120 {
00121
00122 if (!p) {
00123 Info(XrdClientDebug::kUSERDEBUG, "DumpPhyConn", "Phyconn entry, key=NULL");
00124 return 0;
00125 }
00126
00127 Info(XrdClientDebug::kUSERDEBUG, "DumpPhyConn", "Phyconn entry, key='" <<
00128 (key ? key : "***def***") <<
00129 "', LogCnt=" << p->GetLogConnCnt() << (p->IsValid() ? " Valid" : " NotValid") )
00130
00131
00132 return 0;
00133 }
00134
00135
00136 int DestroyPhyConn(const char *key,
00137 XrdClientPhyConnection *p, void *voidcmgr)
00138 {
00139
00140
00141 XrdClientConnectionMgr *cmgr = (XrdClientConnectionMgr *)voidcmgr;
00142 assert(cmgr != 0);
00143
00144 if (p) {
00145 p->UnsolicitedMsgHandler = 0;
00146 delete(p);
00147 }
00148
00149
00150 return -1;
00151 }
00152
00153
00154
00155 XrdClientConnectionMgr::XrdClientConnectionMgr() : fSidManager(0),
00156 fGarbageColl(0)
00157 {
00158
00159
00160
00161
00162 fLastLogIdUsed = 0;
00163
00164 fGarbageColl = new XrdClientThread(GarbageCollectorThread);
00165
00166 if (!fGarbageColl)
00167 Error("ConnectionMgr",
00168 "Can't create garbage collector thread: out of system resources");
00169
00170 fGarbageColl->Run(this);
00171
00172
00173 fSidManager = new XrdClientSid();
00174 if (!fSidManager) {
00175 Error("ConnectionMgr",
00176 "Can't create sid manager: out of system resources");
00177 abort();
00178 }
00179
00180 }
00181
00182
00183 XrdClientConnectionMgr::~XrdClientConnectionMgr()
00184 {
00185
00186
00187 int i=0;
00188
00189 {
00190 XrdSysMutexHelper mtx(fMutex);
00191
00192 for (i = 0; i < fLogVec.GetSize(); i++)
00193 if (fLogVec[i]) Disconnect(i, FALSE);
00194
00195 }
00196
00197 if (fGarbageColl) {
00198 void *ret;
00199 fGarbageColl->Cancel();
00200 fGarbageColl->Join(&ret);
00201 delete fGarbageColl;
00202 }
00203
00204 GarbageCollect();
00205
00206 fPhyHash.Apply(DestroyPhyConn, this);
00207 delete fSidManager;
00208 }
00209
00210
00211 void XrdClientConnectionMgr::GarbageCollect()
00212 {
00213
00214
00215
00216
00217
00218
00219 XrdSysMutexHelper mtx(fMutex);
00220
00221 if (fPhyHash.Num() > 0) {
00222
00223 if(DebugLevel() >= XrdClientDebug::kUSERDEBUG)
00224 fPhyHash.Apply(DumpPhyConn, this);
00225
00226
00227 fPhyHash.Apply(DisconnectElapsedPhyConn, this);
00228
00229 }
00230
00231
00232
00233
00234 for (int i = fPhyTrash.GetSize()-1; i >= 0; i--) {
00235
00236 DumpPhyConn("Trashed connection",
00237 fPhyTrash[i], this);
00238
00239 if ( !fPhyTrash[i] ||
00240 ((fPhyTrash[i]->GetLogConnCnt() <= 0) && (fPhyTrash[i]->ExpiredTTL())) ) {
00241
00242 if (fPhyTrash[i] && (fPhyTrash[i]->GetReaderThreadsCnt() <= 0)) {
00243 delete fPhyTrash[i];
00244
00245
00246 }
00247
00248
00249 fPhyTrash.Erase(i);
00250 }
00251 }
00252
00253 }
00254
00255
00256 int XrdClientConnectionMgr::Connect(XrdClientUrlInfo RemoteServ)
00257 {
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268 XrdClientLogConnection *logconn = 0;
00269 XrdClientPhyConnection *phyconn = 0;
00270 CndVarInfo *cnd = 0;
00271
00272 int newid = -1;
00273 bool phyfound = FALSE;
00274
00275
00276 Info(XrdClientDebug::kHIDEBUG,
00277 "Connect", "Creating a logical connection...");
00278
00279 logconn = new XrdClientLogConnection(fSidManager);
00280 if (!logconn) {
00281 Error("Connect", "Object creation failed. Aborting.");
00282 abort();
00283 }
00284
00285
00286 if (RemoteServ.User.length() <= 0) {
00287 #ifndef WIN32
00288 struct passwd *pw = getpwuid(getuid());
00289 RemoteServ.User = (pw) ? pw->pw_name : "";
00290 #else
00291 char name[256];
00292 DWORD length = sizeof (name);
00293 ::GetUserName(name, &length);
00294 RemoteServ.User = name;
00295 #endif
00296 }
00297
00298
00299 XrdOucString key;
00300 XrdOucString key1(RemoteServ.User.c_str(), 256); key1 += '@';
00301 key1 += RemoteServ.Host; key1 += ':'; key1 += RemoteServ.Port;
00302 XrdOucString key2(RemoteServ.User.c_str(), 256); key2 += '@';
00303 key2 += RemoteServ.HostAddr; key2 += ':'; key2 += RemoteServ.Port;
00304
00305 do {
00306
00307
00308 fMutex.Lock();
00309 cnd = 0;
00310
00311 cnd = fConnectingCondVars.Find(key1.c_str());
00312 if (!cnd) cnd = fConnectingCondVars.Find(key2.c_str());
00313
00314
00315 if (!cnd) {
00316
00317
00318
00319 if (fPhyHash.Num() > 0) {
00320 XrdClientPhyConnection *p = 0;
00321
00322
00323 GarbageCollect();
00324
00325 if (((p = fPhyHash.Find(key1.c_str())) ||
00326 (p = fPhyHash.Find(key2.c_str()))) && p->IsValid()) {
00327
00328 phyconn = p;
00329 phyconn->CountLogConn();
00330 phyconn->Touch();
00331 logconn->SetPhyConnection(phyconn);
00332
00333 phyfound = TRUE;
00334 }
00335 else {
00336
00337
00338
00339 fConnectingCondVars.Rep(key1.c_str(), new CndVarInfo(), 0, Hash_keepdata);
00340 }
00341 }
00342
00343 fMutex.UnLock();
00344 }
00345 else {
00346
00347
00348 cnd->cv.Lock();
00349 cnd->cnt++;
00350 fMutex.UnLock();
00351 cnd->cv.Wait();
00352 cnd->cnt--;
00353 cnd->cv.UnLock();
00354 }
00355
00356 } while (cnd);
00357
00358
00359
00360 if (!phyfound) {
00361
00362 Info(XrdClientDebug::kHIDEBUG,
00363 "Connect",
00364 "Physical connection not found. Creating a new one...");
00365
00366 if (DebugLevel() >= XrdClientDebug::kHIDEBUG)
00367 fPhyHash.Apply(DumpPhyConn, this);
00368
00369
00370
00371
00372
00373
00374
00375 if (!(phyconn = new XrdClientPhyConnection(this, fSidManager))) {
00376 Error("Connect", "Object creation failed. Aborting.");
00377 abort();
00378 }
00379 if ( phyconn && phyconn->Connect(RemoteServ) ) {
00380
00381 phyconn->CountLogConn();
00382 logconn->SetPhyConnection(phyconn);
00383
00384 if (DebugLevel() >= XrdClientDebug::kHIDEBUG)
00385 Info(XrdClientDebug::kHIDEBUG,
00386 "Connect",
00387 "New physical connection to server " <<
00388 RemoteServ.Host << ":" << RemoteServ.Port <<
00389 " succesfully created.");
00390
00391 } else {
00392
00393
00394 {
00395 XrdSysMutexHelper mtx(fMutex);
00396 int cnt;
00397
00398 key = key1;
00399 cnd = fConnectingCondVars.Find(key.c_str());
00400 if (!cnd) { key = key2; cnd = fConnectingCondVars.Find(key.c_str()); }
00401 if (cnd) {
00402 cnd->cv.Lock();
00403 cnd->cv.Broadcast();
00404 fConnectingCondVars.Del(key.c_str());
00405 cnt = cnd->cnt;
00406 cnd->cv.UnLock();
00407
00408 if (!cnt) {
00409 Info(XrdClientDebug::kHIDEBUG, "Connect",
00410 "Destroying connection condvar for " << key );
00411 delete cnd;
00412 }
00413 }
00414 }
00415 delete logconn;
00416
00417 phyconn->Touch();
00418 fPhyTrash.Push_back(phyconn);
00419
00420
00421 return -1;
00422 }
00423
00424 }
00425
00426
00427
00428
00429 {
00430 XrdSysMutexHelper mtx(fMutex);
00431 phyconn->WipeStreamid(logconn->Streamid());
00432
00433
00434 if (!phyfound) {
00435 if (!phyconn)
00436 Error("Connect"," problems connecting to " << key1);
00437 fPhyHash.Rep(key1.c_str(), phyconn, 0, Hash_keepdata);
00438 }
00439
00440 if (fLogVec.GetSize() < XRC_MAXVECTSIZE) {
00441
00442 fLogVec.Push_back(logconn);
00443
00444 newid = fLogVec.GetSize()-1;
00445 }
00446 else {
00447
00448 newid = -1;
00449 for (int i = 0; i < fLogVec.GetSize(); i++) {
00450 int idx = (fLastLogIdUsed + i) % fLogVec.GetSize();
00451 if (!fLogVec[idx]) {
00452 fLogVec[idx] = logconn;
00453 newid = idx;
00454 fLastLogIdUsed = idx;
00455 break;
00456 }
00457 }
00458 if (newid == -1) {
00459 delete logconn;
00460 Error("Connect", "Critical error - Out of allocated resources:"
00461 " max number allowed of logical connections reached ("<<XRC_MAXVECTSIZE<<")");
00462 return -1;
00463 }
00464 }
00465
00466
00467 if (DebugLevel() >= XrdClientDebug::kHIDEBUG) {
00468
00469 int logCnt = 0;
00470 for (int i=0; i < fLogVec.GetSize(); i++)
00471 if (fLogVec[i])
00472 logCnt++;
00473
00474 Info(XrdClientDebug::kHIDEBUG, "Connect",
00475 "LogConn: size:" << fLogVec.GetSize() << " count: " << logCnt <<
00476 "PhyConn: size:" << fPhyHash.Num());
00477 }
00478
00479
00480
00481
00482 int cnt;
00483
00484 key = key1;
00485 cnd = fConnectingCondVars.Find(key.c_str());
00486 if (!cnd) { key = key2; cnd = fConnectingCondVars.Find(key.c_str()); }
00487 if (cnd) {
00488 cnd->cv.Lock();
00489 cnd->cv.Broadcast();
00490 fConnectingCondVars.Del(key.c_str());
00491 cnt = cnd->cnt;
00492 cnd->cv.UnLock();
00493
00494 if (!cnt) {
00495 Info(XrdClientDebug::kHIDEBUG, "Connect",
00496 "Destroying connection condvar for " << key );
00497 delete cnd;
00498 }
00499 }
00500
00501
00502 }
00503
00504 return newid;
00505 }
00506
00507
00508 void XrdClientConnectionMgr::Disconnect(int LogConnectionID,
00509 bool ForcePhysicalDisc)
00510 {
00511
00512
00513 if (LogConnectionID < 0) return;
00514
00515 {
00516 XrdSysMutexHelper mtx(fMutex);
00517
00518 if ((LogConnectionID < 0) ||
00519 (LogConnectionID >= fLogVec.GetSize()) || (!fLogVec[LogConnectionID])) {
00520 Error("Disconnect", "Destroying nonexistent logconn " << LogConnectionID);
00521 return;
00522 }
00523
00524
00525 if (ForcePhysicalDisc) {
00526
00527
00528
00529
00530
00531 fLogVec[LogConnectionID]->GetPhyConnection()->UnsolicitedMsgHandler = 0;
00532 fLogVec[LogConnectionID]->GetPhyConnection()->Disconnect();
00533 GarbageCollect();
00534 }
00535 else
00536 fLogVec[LogConnectionID]->GetPhyConnection()->WipeStreamid(fLogVec[LogConnectionID]->Streamid());
00537
00538 fLogVec[LogConnectionID]->GetPhyConnection()->Touch();
00539 delete fLogVec[LogConnectionID];
00540 fLogVec[LogConnectionID] = 0;
00541
00542 Info(XrdClientDebug::kHIDEBUG, "Disconnect",
00543 " LogConnID: " << LogConnectionID <<" destroyed");
00544 }
00545
00546 }
00547
00548
00549 int XrdClientConnectionMgr::ReadRaw(int LogConnectionID, void *buffer,
00550 int BufferLength)
00551 {
00552
00553
00554 XrdClientLogConnection *logconn;
00555
00556 logconn = GetConnection(LogConnectionID);
00557
00558 if (logconn) {
00559 return logconn->ReadRaw(buffer, BufferLength);
00560 }
00561 else {
00562 Error("ReadRaw", "There's not a logical connection with id " <<
00563 LogConnectionID);
00564
00565 return(TXSOCK_ERR);
00566 }
00567 }
00568
00569
00570 XrdClientMessage *XrdClientConnectionMgr::ReadMsg(int LogConnectionID)
00571 {
00572 XrdClientLogConnection *logconn;
00573 XrdClientMessage *mex;
00574
00575 logconn = GetConnection(LogConnectionID);
00576
00577
00578
00579 mex = logconn->GetPhyConnection()->ReadMessage(logconn->Streamid());
00580
00581
00582 return mex;
00583 }
00584
00585
00586 int XrdClientConnectionMgr::WriteRaw(int LogConnectionID, const void *buffer,
00587 int BufferLength, int substreamid) {
00588
00589
00590 XrdClientLogConnection *logconn;
00591
00592 logconn = GetConnection(LogConnectionID);
00593
00594 if (logconn) {
00595 return logconn->WriteRaw(buffer, BufferLength, substreamid);
00596 }
00597 else {
00598 Error("WriteRaw", "There's not a logical connection with id " <<
00599 LogConnectionID);
00600
00601 return(TXSOCK_ERR);
00602 }
00603 }
00604
00605
00606 XrdClientLogConnection *XrdClientConnectionMgr::GetConnection( int LogConnectionID)
00607 {
00608
00609 XrdSysMutexHelper mtx(fMutex);
00610
00611 return (LogConnectionID > -1) ? fLogVec[LogConnectionID] : (XrdClientLogConnection *)0;
00612 }
00613
00614
00615 XrdClientPhyConnection *XrdClientConnectionMgr::GetPhyConnection(XrdClientUrlInfo server)
00616 {
00617
00618
00619
00620 XrdClientPhyConnection *p = 0;
00621
00622
00623 if (server.User.length() <= 0) {
00624 #ifndef WIN32
00625 struct passwd *pw = getpwuid(getuid());
00626 server.User = (pw) ? pw->pw_name : "";
00627 #else
00628 char name[256];
00629 DWORD length = sizeof (name);
00630 ::GetUserName(name, &length);
00631 server.User = name;
00632 #endif
00633 }
00634
00635
00636 XrdOucString key;
00637 XrdOucString key1(server.User.c_str(), 256); key1 += '@';
00638 key1 += server.Host; key1 += ':'; key1 += server.Port;
00639 XrdOucString key2(server.User.c_str(), 256); key2 += '@';
00640 key2 += server.HostAddr; key2 += ':'; key2 += server.Port;
00641
00642 if (fPhyHash.Num() > 0) {
00643 if (((p = fPhyHash.Find(key1.c_str())) ||
00644 (p = fPhyHash.Find(key2.c_str()))) && !(p->IsValid())) {
00645
00646 p = 0;
00647 }
00648 }
00649
00650
00651 return p;
00652 }
00653
00654
00655 UnsolRespProcResult XrdClientConnectionMgr::ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *sender,
00656 XrdClientMessage *unsolmsg)
00657 {
00658
00659
00660
00661
00662
00663
00664
00665
00666 UnsolRespProcResult res = kUNSOL_CONTINUE;
00667
00668
00669
00670
00671
00672
00673
00674 {
00675
00676 XrdSysMutexHelper mtx(fMutex);
00677
00678 for (int i = 0; i < fLogVec.GetSize(); i++) {
00679
00680 if ( fLogVec[i] && (fLogVec[i]->GetPhyConnection() == sender) ) {
00681 fMutex.UnLock();
00682 res = fLogVec[i]->ProcessUnsolicitedMsg(sender, unsolmsg);
00683 fMutex.Lock();
00684
00685 if (res != kUNSOL_CONTINUE) break;
00686 }
00687 }
00688 }
00689 return res;
00690 }