00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "TMonitor.h"
00026 #include "TSocket.h"
00027 #include "TList.h"
00028 #include "TSystem.h"
00029 #include "TSysEvtHandler.h"
00030 #include "TTimer.h"
00031 #include "TError.h"
00032
00033
00034
00035
00036
00037
00038
00039 class TSocketHandler : public TFileHandler {
00040 private:
00041 TMonitor *fMonitor;
00042 TSocket *fSocket;
00043
00044 public:
00045 TSocketHandler(TMonitor *m, TSocket *s, Int_t interest, Bool_t mainloop = kTRUE);
00046 Bool_t Notify();
00047 Bool_t ReadNotify() { return Notify(); }
00048 Bool_t WriteNotify() { return Notify(); }
00049 TSocket *GetSocket() const { return fSocket; }
00050 };
00051
00052 TSocketHandler::TSocketHandler(TMonitor *m, TSocket *s,
00053 Int_t interest, Bool_t mainloop)
00054 : TFileHandler(s->GetDescriptor(), interest)
00055 {
00056
00057 fMonitor = m;
00058 fSocket = s;
00059
00060 if (mainloop)
00061 Add();
00062 }
00063
00064 Bool_t TSocketHandler::Notify()
00065 {
00066
00067 fMonitor->SetReady(fSocket);
00068 return kTRUE;
00069 }
00070
00071
00072
00073
00074
00075
00076 class TTimeOutTimer : public TTimer {
00077 private:
00078 TMonitor *fMonitor;
00079
00080 public:
00081 TTimeOutTimer(TMonitor *m, Long_t ms);
00082 Bool_t Notify();
00083 };
00084
00085 TTimeOutTimer::TTimeOutTimer(TMonitor *m, Long_t ms)
00086 : TTimer(ms, kTRUE)
00087 {
00088
00089 fMonitor = m;
00090 gSystem->AddTimer(this);
00091 }
00092
00093 Bool_t TTimeOutTimer::Notify()
00094 {
00095
00096 fMonitor->SetReady((TSocket *)-1);
00097 Remove();
00098 return kTRUE;
00099 }
00100
00101
00102
00103 ClassImp(TMonitor)
00104
00105
00106 TMonitor::TMonitor(Bool_t mainloop) : TObject() , TQObject()
00107 {
00108
00109
00110
00111 R__ASSERT(gSystem);
00112
00113 fActive = new TList;
00114 fDeActive = new TList;
00115 fMainLoop = mainloop;
00116 fInterrupt = kFALSE;
00117 }
00118
00119
00120 TMonitor::TMonitor(const TMonitor &m) : TObject() , TQObject()
00121 {
00122
00123
00124 TSocketHandler *sh = 0;
00125
00126 fActive = new TList;
00127 TIter nxa(m.fActive);
00128 while ((sh = (TSocketHandler *)nxa())) {
00129 Int_t mask = 0;
00130 if (sh->HasReadInterest()) mask |= 0x1;
00131 if (sh->HasWriteInterest()) mask |= 0x2;
00132 fActive->Add(new TSocketHandler(this, sh->GetSocket(), mask, m.fMainLoop));
00133 }
00134
00135 fDeActive = new TList;
00136 TIter nxd(m.fDeActive);
00137 while ((sh = (TSocketHandler *)nxd())) {
00138 Int_t mask = 0;
00139 if (sh->HasReadInterest()) mask |= 0x1;
00140 if (sh->HasWriteInterest()) mask |= 0x2;
00141 fDeActive->Add(new TSocketHandler(this, sh->GetSocket(), mask, m.fMainLoop));
00142 }
00143
00144 fMainLoop = m.fMainLoop;
00145 fInterrupt = m.fInterrupt;
00146 fReady = 0;
00147 }
00148
00149
00150 TMonitor::~TMonitor()
00151 {
00152
00153
00154 fActive->Delete();
00155 SafeDelete(fActive);
00156
00157 fDeActive->Delete();
00158 SafeDelete(fDeActive);
00159 }
00160
00161
00162 void TMonitor::Add(TSocket *sock, Int_t interest)
00163 {
00164
00165
00166
00167
00168
00169 fActive->Add(new TSocketHandler(this, sock, interest, fMainLoop));
00170 }
00171
00172
00173 void TMonitor::SetInterest(TSocket *sock, Int_t interest)
00174 {
00175
00176
00177
00178
00179
00180
00181 TSocketHandler *s = 0;
00182
00183 if (!interest)
00184 interest = kRead;
00185
00186
00187 TIter next(fActive);
00188 while ((s = (TSocketHandler *) next())) {
00189 if (sock == s->GetSocket()) {
00190 s->SetInterest(interest);
00191 return;
00192 }
00193 }
00194
00195
00196 TIter next1(fDeActive);
00197 while ((s = (TSocketHandler *) next1())) {
00198 if (sock == s->GetSocket()) {
00199 fDeActive->Remove(s);
00200 fActive->Add(s);
00201 s->SetInterest(interest);
00202 return;
00203 }
00204 }
00205
00206
00207 fActive->Add(new TSocketHandler(this, sock, interest, fMainLoop));
00208 }
00209
00210
00211 void TMonitor::Remove(TSocket *sock)
00212 {
00213
00214
00215 TIter next(fActive);
00216 TSocketHandler *s;
00217
00218 while ((s = (TSocketHandler *) next())) {
00219 if (sock == s->GetSocket()) {
00220 fActive->Remove(s);
00221 delete s;
00222 return;
00223 }
00224 }
00225
00226 TIter next1(fDeActive);
00227
00228 while ((s = (TSocketHandler *) next1())) {
00229 if (sock == s->GetSocket()) {
00230 fDeActive->Remove(s);
00231 delete s;
00232 return;
00233 }
00234 }
00235 }
00236
00237
00238 void TMonitor::RemoveAll()
00239 {
00240
00241
00242 fActive->Delete();
00243 fDeActive->Delete();
00244 }
00245
00246
00247 void TMonitor::Activate(TSocket *sock)
00248 {
00249
00250
00251 TIter next(fDeActive);
00252 TSocketHandler *s;
00253
00254 while ((s = (TSocketHandler *) next())) {
00255 if (sock == s->GetSocket()) {
00256 fDeActive->Remove(s);
00257 fActive->Add(s);
00258 s->Add();
00259 return;
00260 }
00261 }
00262 }
00263
00264
00265 void TMonitor::ActivateAll()
00266 {
00267
00268
00269 TIter next(fDeActive);
00270 TSocketHandler *s;
00271
00272 while ((s = (TSocketHandler *) next())) {
00273 fActive->Add(s);
00274 s->Add();
00275 }
00276 fDeActive->Clear();
00277 fInterrupt = kFALSE;
00278 }
00279
00280
00281 void TMonitor::DeActivate(TSocket *sock)
00282 {
00283
00284
00285 TIter next(fActive);
00286 TSocketHandler *s;
00287
00288 while ((s = (TSocketHandler *) next())) {
00289 if (sock == s->GetSocket()) {
00290 fActive->Remove(s);
00291 fDeActive->Add(s);
00292 s->Remove();
00293 return;
00294 }
00295 }
00296 }
00297
00298
00299 void TMonitor::DeActivateAll()
00300 {
00301
00302
00303 TIter next(fActive);
00304 TSocketHandler *s;
00305
00306 while ((s = (TSocketHandler *) next())) {
00307 fDeActive->Add(s);
00308 s->Remove();
00309 }
00310 fActive->Clear();
00311 fInterrupt = kFALSE;
00312 }
00313
00314
00315 TSocket *TMonitor::Select()
00316 {
00317
00318
00319
00320
00321
00322
00323 fReady = 0;
00324
00325 while (!fReady && !fInterrupt)
00326 gSystem->InnerLoop();
00327
00328
00329 if (fInterrupt) {
00330 fInterrupt = kFALSE;
00331 fReady = 0;
00332 Info("Select","*** interrupt occured ***");
00333 }
00334
00335 return fReady;
00336 }
00337
00338
00339 TSocket *TMonitor::Select(Long_t timeout)
00340 {
00341
00342
00343
00344
00345
00346
00347
00348
00349 if (timeout < 0)
00350 return TMonitor::Select();
00351
00352 fReady = 0;
00353
00354 TTimeOutTimer t(this, timeout);
00355
00356 while (!fReady && !fInterrupt)
00357 gSystem->InnerLoop();
00358
00359
00360 if (fInterrupt) {
00361 fInterrupt = kFALSE;
00362 fReady = 0;
00363 Info("Select","*** interrupt occured ***");
00364 }
00365
00366 return fReady;
00367 }
00368
00369
00370 Int_t TMonitor::Select(TList *rdready, TList *wrready, Long_t timeout)
00371 {
00372
00373
00374
00375
00376
00377
00378 Int_t nr = -2;
00379
00380 TSocketHandler *h = 0;
00381 Int_t ns = fActive->GetSize();
00382 if (ns == 1) {
00383
00384 h = (TSocketHandler *)fActive->First();
00385 nr = gSystem->Select((TFileHandler *)h, timeout);
00386 } else if (ns > 1) {
00387 nr = gSystem->Select(fActive, timeout);
00388 }
00389
00390 if (nr > 0 && (rdready || wrready)) {
00391
00392 if (rdready)
00393 rdready->Clear();
00394 if (wrready)
00395 wrready->Clear();
00396
00397 if (!h) {
00398 TIter next(fActive);
00399 while ((h = (TSocketHandler *)next())) {
00400 if (rdready && h->IsReadReady())
00401 rdready->Add(h->GetSocket());
00402 if (wrready && h->IsWriteReady())
00403 wrready->Add(h->GetSocket());
00404 }
00405 } else {
00406 if (rdready && h->IsReadReady())
00407 rdready->Add(h->GetSocket());
00408 if (wrready && h->IsWriteReady())
00409 wrready->Add(h->GetSocket());
00410 }
00411 }
00412
00413 return nr;
00414 }
00415
00416
00417 void TMonitor::SetReady(TSocket *sock)
00418 {
00419
00420
00421
00422
00423
00424 fReady = sock;
00425 Ready(fReady);
00426 }
00427
00428
00429 Int_t TMonitor::GetActive(Long_t timeout) const
00430 {
00431
00432
00433
00434
00435
00436
00437
00438
00439 if (timeout >= 0) {
00440 TIter next(fActive);
00441 TSocketHandler *s;
00442 if (timeout > 0) {
00443 TTimeStamp now;
00444 while ((s = (TSocketHandler *) next())) {
00445 TSocket *xs = s->GetSocket();
00446 TTimeStamp ts = xs->GetLastUsage();
00447 Long_t dt = (Long_t)(now.GetSec() - ts.GetSec()) * 1000 +
00448 (Long_t)(now.GetNanoSec() - ts.GetNanoSec()) / 1000000 ;
00449 if (dt > timeout) {
00450 Info("GetActive", "socket: %p: %s:%d did not show any activity"
00451 " during the last %ld millisecs: deactivating",
00452 xs, xs->GetInetAddress().GetHostName(),
00453 xs->GetInetAddress().GetPort(), timeout);
00454 fActive->Remove(s);
00455 fDeActive->Add(s);
00456 s->Remove();
00457 }
00458 }
00459 } else if (timeout == 0) {
00460
00461 while ((s = (TSocketHandler *) next())) {
00462 s->GetSocket()->Touch();
00463 }
00464 }
00465 }
00466 return fActive->GetSize();
00467 }
00468
00469
00470 Int_t TMonitor::GetDeActive() const
00471 {
00472
00473
00474 return fDeActive->GetSize();
00475 }
00476
00477
00478 Bool_t TMonitor::IsActive(TSocket *sock) const
00479 {
00480
00481
00482
00483 TIter next(fActive);
00484 while (TSocketHandler *h = (TSocketHandler*) next())
00485 if (sock == h->GetSocket())
00486 return kTRUE;
00487
00488
00489 return kFALSE;
00490 }
00491
00492
00493 TList *TMonitor::GetListOfActives() const
00494 {
00495
00496
00497
00498
00499 TList *list = new TList;
00500
00501 TIter next(fActive);
00502
00503 while (TSocketHandler *h = (TSocketHandler*) next())
00504 list->Add(h->GetSocket());
00505
00506 return list;
00507 }
00508
00509
00510 TList *TMonitor::GetListOfDeActives() const
00511 {
00512
00513
00514
00515
00516 TList *list = new TList;
00517
00518 TIter next(fDeActive);
00519
00520 while (TSocketHandler *h = (TSocketHandler*) next())
00521 list->Add(h->GetSocket());
00522
00523 return list;
00524 }
00525
00526
00527 void TMonitor::Ready(TSocket *sock)
00528 {
00529
00530
00531 Emit("Ready(TSocket*)", (Long_t)sock);
00532 }