00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "TGo4AnalysisClientImp.h"
00017
00018 #include <iostream.h>
00019
00020 #include "TApplication.h"
00021 #include "TStopwatch.h"
00022
00023 #include "TGo4CintLockTimer.h"
00024 #include "TSysEvtHandler.h"
00025 #include "TTimer.h"
00026
00027 #include "Go4Log/TGo4Log.h"
00028 #include "Go4LockGuard/TGo4LockGuard.h"
00029
00030 #include "Go4CommandsBase/TGo4CommandInvoker.h"
00031 #include "Go4TaskHandler/TGo4TaskHandler.h"
00032 #include "Go4TaskHandler/TGo4ClientStatus.h"
00033 #include "Go4StatusAnalysis/TGo4AnalysisClientStatus.h"
00034 #include "Go4StatusAnalysis/TGo4AnalysisObjectNames.h"
00035 #include "Go4HistogramServer/TGo4HistogramServer.h"
00036 #include "Go4StatusAnalysis/TGo4AnalysisStatus.h"
00037 #include "Go4Analysis/TGo4AnalysisImp.h"
00038
00039 #include "TGo4AnalysisMainRunnable.h"
00040 #include "TGo4AnalysisWatchRunnable.h"
00041 #include "TGo4Ratemeter.h"
00042 #include "Go4TaskHandler/TGo4TaskStatus.h"
00043
00044
00045
00046
00047 class TGo4InterruptHandler : public TSignalHandler {
00048 public:
00049 TGo4InterruptHandler(TSignalHandler* old = 0) :
00050 TSignalHandler(kSigInterrupt, kFALSE),
00051 oldHandler(old),
00052 fbActive(kFALSE)
00053 {
00054 }
00055 virtual Bool_t Notify()
00056 {
00057
00058 if (fbActive) return kTRUE;
00059 fbActive = kTRUE;
00060 Bool_t res = kTRUE;
00061 if (TGo4Analysis::Instance()!=0) {
00062 TGo4Analysis::Instance()->StopWaiting();
00063
00064 }
00065 if (oldHandler!=0) res = oldHandler->Notify();
00066 TTimer::SingleShot(3000, ClassName(), this, "Pop()");
00067 return res;
00068 }
00069
00070 virtual void Pop()
00071 {
00072
00073 fbActive = kFALSE;
00074 TGo4Analysis::Instance()->SetRunning(kFALSE);
00075 }
00076
00077 protected:
00078 TSignalHandler* oldHandler;
00079 Bool_t fbActive;
00080 };
00081
00082
00083
00084 const Text_t TGo4AnalysisClient::fgcWATCHTHREAD[]="WATCH-";
00085 const Text_t TGo4AnalysisClient::fgcMAINTHREAD[]="MAIN-";
00086 const UInt_t TGo4AnalysisClient::fguSTATUSUPDATE = 1000;
00087 const Double_t TGo4AnalysisClient::fgdSTATUSTIMEOUT = 2;
00088 const UInt_t TGo4AnalysisClient::fguCINTTIMERPERIOD = 200;
00089
00090 TGo4AnalysisClient::TGo4AnalysisClient(const char* name,
00091 TGo4Analysis* analysis,
00092 const char* host,
00093 UInt_t negport,
00094 Bool_t histoserver,
00095 const char* basename,
00096 const char* passwd,
00097 Bool_t servermode,
00098 Bool_t autorun,
00099 Bool_t clientmode)
00100 : TGo4Slave(name, servermode, host, negport),
00101 fdBufferUpdateTime(0), fxHistoServer(0), fbAutoStart(autorun), fbCintMode(kFALSE),fxCintLockTimer(0)
00102
00103 {
00104 TRACE((15,"TGo4AnalysisClient::TGo4AnalysisClient(const char*,...)",__LINE__, __FILE__));
00105
00106 if(analysis==0)
00107 {
00108 TGo4Log::Debug("!!! AnalysisClient ''%s'': no external analysis specified !!",GetName());
00109 fxAnalysis=TGo4Analysis::Instance();
00110 }
00111 else
00112 {
00113 fxAnalysis=analysis;
00114 }
00115 fxAnalysis->SetAnalysisClient(this);
00116 Constructor(histoserver,basename,passwd);
00117 SetCintMode(clientmode);
00118
00119 if (clientmode) {
00120 TSignalHandler* oldhandler = gApplication->GetSignalHandler();
00121 oldhandler->Remove();
00122
00123 fxInterruptHandler = new TGo4InterruptHandler(oldhandler);
00124 fxInterruptHandler->Add();
00125 }
00126 }
00127
00128 TGo4AnalysisClient::TGo4AnalysisClient(int argc, char** argv,
00129 TGo4Analysis* analysis,
00130 Bool_t histoserver,
00131 const char* basename,
00132 const char* passwd,
00133 Bool_t servermode,
00134 Bool_t autorun)
00135 : TGo4Slave(argv[2],servermode,
00136 argv[3] , (argc>4) ? atoi(argv[4]) : 5000),
00137 fdBufferUpdateTime(0), fxHistoServer(0), fbAutoStart(autorun), fbCintMode(kFALSE),fxCintLockTimer(0)
00138
00139 {
00140 TRACE((15,"TGo4AnalysisClient::TGo4AnalysisClient(int, char**...)",__LINE__, __FILE__));
00141
00142 if(argc<5)
00143 {
00144 TGo4Log::Error("!!! AnalysisClient: missing commandline arguments, aborting !!");
00145 gApplication->Terminate();
00146 }
00147 if(!strcmp("-lGUI",argv[1]))
00148 {
00149 TGo4Log::Error(" !!! AnalysisClient: GUI mode not specified, aborting !!");
00150 gApplication->Terminate();
00151 }
00152
00153 if(analysis==0)
00154 {
00155 TGo4Log::Debug(" !!! AnalysisClient ''%s'': no external analysis specified !!",GetName());
00156 fxAnalysis=TGo4Analysis::Instance();
00157 }
00158 else
00159 {
00160 fxAnalysis=analysis;
00161 }
00162 fxAnalysis->SetAnalysisClient(this);
00163 Constructor(histoserver,basename,passwd);
00164 }
00165
00166 void TGo4AnalysisClient::Constructor(Bool_t starthistserv, const char* basename, const char* passwd)
00167 {
00168 fxUpdateWatch = new TStopwatch;
00169 fxRatemeter= new TGo4Ratemeter;
00170 TGo4Log::Debug(" AnalysisClient ''%s'' started ",GetName());
00171 fcMainName= new Text_t[TGo4ThreadManager::fguTEXTLENGTH];
00172 fcWatchName= new Text_t[TGo4ThreadManager::fguTEXTLENGTH];
00173 Text_t namebuffer[TGo4ThreadManager::fguTEXTLENGTH];
00174 snprintf(namebuffer, TGo4ThreadManager::fguTEXTLENGTH-1 ,"MainRunnable of %s",GetName());
00175 TGo4AnalysisMainRunnable* mainrun= new TGo4AnalysisMainRunnable(namebuffer, this);
00176 snprintf(namebuffer,TGo4ThreadManager::fguTEXTLENGTH-1,"WatchRunnable of %s",GetName());
00177 TGo4AnalysisWatchRunnable* watchrun= new TGo4AnalysisWatchRunnable(namebuffer, this);
00178
00179 snprintf(fcMainName, TGo4ThreadManager::fguTEXTLENGTH-1,"%s%s",fgcMAINTHREAD,GetName());
00180 TGo4ThreadHandler* th=GetThreadHandler();
00181 th->NewThread(fcMainName,mainrun);
00182 snprintf(fcWatchName, TGo4ThreadManager::fguTEXTLENGTH-1,"%s%s",fgcWATCHTHREAD,GetName());
00183 th->NewThread(fcWatchName,watchrun);
00184 TGo4CommandInvoker::Instance();
00185 TGo4CommandInvoker::Register("AnalysisClient",this);
00186 TGo4Slave::Stop();
00187 UpdateStatusBuffer();
00188 if(starthistserv) StartObjectServer(basename, passwd);
00189 GetTask()->Launch();
00190
00191 fxInterruptHandler = 0;
00192 }
00193
00194
00195 TGo4AnalysisClient::~TGo4AnalysisClient()
00196 {
00197 TRACE((15,"TGo4AnalysisClient::~TGo4AnalysisClient()",__LINE__, __FILE__));
00198
00199 if (fxInterruptHandler!=0) {
00200
00201 fxInterruptHandler->Remove();
00202 delete fxInterruptHandler;
00203
00204
00205 TSignalHandler* oldhandler = gApplication->GetSignalHandler();
00206 oldhandler->Add();
00207 }
00208
00209 if(GetTask())
00210 {
00211 GetTask()->GetTaskHandler()->DisConnect();
00212 GetTask()->GetTaskHandler()->SetAborting();
00213 }
00214 fxAnalysis->LockAutoSave();
00215 {
00216 StopObjectServer();
00217 if(GetThreadHandler()) GetThreadHandler()->CancelAll();
00218 }
00219 fxAnalysis->UnLockAutoSave();
00220
00221 delete fxCintLockTimer;
00222 delete fxRatemeter;
00223 delete fxUpdateWatch;
00224 delete fxAnalysis;
00225 delete [] fcMainName;
00226 delete [] fcWatchName;
00227 }
00228
00229 Int_t TGo4AnalysisClient::Initialization()
00230 {
00231 TGo4LockGuard mainguard;
00232 SendStatusMessage(1,kTRUE,"AnalysisClient %s starting initialization...",GetName());
00233
00234 if(!fbAutoStart)
00235 {
00236
00237
00238 if(fxAnalysis->LoadStatus())
00239 {
00240
00241 SendStatusMessage(1,kTRUE,"AnalysisClient %s: Status loaded from %s",
00242 GetName(), TGo4Analysis::fgcDEFAULTSTATUSFILENAME);
00243 }
00244 else
00245 {
00246 SendStatusMessage(2,kTRUE,"AnalysisClient %s: Could not load status from %s",
00247 GetName(), TGo4Analysis::fgcDEFAULTSTATUSFILENAME);
00248 }
00249
00250 if(fxAnalysis->LoadObjects())
00251 {
00252 SendStatusMessage(1,kTRUE,"AnalysisClient %s: Objects loaded.",GetName());
00253 }
00254 else
00255 {
00256
00257 SendStatusMessage(2,kTRUE,"AnalysisClient %s: Initialization could not load analysis objects!",GetName());
00258 }
00259
00260 SendStatusMessage(1,kTRUE,"Analysis Slave %s waiting for submit and start commands...",GetName());
00261 TGo4Slave::Stop();
00262 }
00263 else
00264 {
00265
00266
00267
00268 SendStatusMessage(1,kTRUE,"AnalysisSlave %s initializing analysis...",GetName());
00269 if(fxAnalysis->InitEventClasses())
00270 {
00271 if(IsCintMode())
00272 {
00273 SendStatusMessage(1,kTRUE,"Analysis CINTServer %s in MainCycle suspend mode.",GetName());
00274 TGo4Slave::Stop();
00275 }
00276 else
00277 {
00278 SendStatusMessage(1,kTRUE,"AnalysisSlave %s starting analysis...",GetName());
00279 Start();
00280 }
00281 }
00282 else
00283 {
00284 SendStatusMessage(2,kTRUE,"AnalysisSlave %s failed initializing analysis!",GetName());
00285 TGo4Slave::Stop();
00286 }
00287 }
00288 SendAnalysisStatus();
00289 UpdateStatusBuffer();
00290 SendAnalysisClientStatus();
00291 SendStatusMessage(1,kFALSE,"AnalysisClient %s has finished initialization.",GetName());
00292 return 0;
00293 }
00294
00295
00296
00297 void TGo4AnalysisClient::UpdateStatus(TGo4TaskStatus * state)
00298 {
00299 TRACE((12,"TGo4AnalysisClient::UpdateStatus(TGo4ClientStatus*)",__LINE__, __FILE__));
00300 TGo4Slave::UpdateStatus(state);
00301 TGo4AnalysisClientStatus* anstate= dynamic_cast<TGo4AnalysisClientStatus*> (state);
00302 if (anstate)
00303 {
00304 Double_t rate=fxRatemeter->GetRate();
00305 Double_t avrate=fxRatemeter->GetAvRate();
00306 UInt_t n=fxRatemeter->GetCurrentCount();
00307 Double_t t=fxRatemeter->GetTime();
00308 anstate->SetRates(rate, avrate, n,t);
00309 }
00310 else
00311 {
00312
00313 }
00314 }
00315 TGo4TaskStatus* TGo4AnalysisClient::CreateStatus()
00316 {
00317 TRACE((12,"TGo4AnalysisClient::CreateStatus()",__LINE__, __FILE__));
00318
00319 TGo4AnalysisClientStatus* stat= new TGo4AnalysisClientStatus(GetName());
00320 UpdateStatus(stat);
00321 return stat;
00322 }
00323
00324 void TGo4AnalysisClient::Start()
00325 {
00326 TRACE((12,"TGo4AnalysisClient::Start()",__LINE__, __FILE__));
00327 if(fxAnalysis->IsInitDone())
00328 {
00329 if(GetThreadHandler()) GetThreadHandler()->Start(fcMainName);
00330 if(!MainIsRunning()) fxAnalysis->PreLoop();
00331 TGo4Slave::Start();
00332 fxRatemeter->Reset();
00333
00334 fdBufferUpdateTime=fxUpdateWatch->RealTime();
00335 fxUpdateWatch->Continue();
00336 SendStatusMessage(1,kTRUE,"AnalysisClient %s has started analysis processing.",GetName());
00337 UpdateRate(-2);
00338 UpdateStatusBuffer();
00339 SendAnalysisClientStatus();
00340 }
00341 else
00342 {
00343
00344 SendStatusMessage(2,kTRUE,"Analysis %s was not initialized! Please SUBMIT settings first.",fxAnalysis->GetName());
00345 }
00346 }
00347
00348 void TGo4AnalysisClient::SendAnalysisObject(const Text_t * name)
00349 {
00350 TRACE((12,"TGo4AnalysisClient::SendAnalysisObject(Text_t* name)",__LINE__, __FILE__));
00351 TNamed* ob=fxAnalysis->GetObject(name);
00352 SendObject(ob);
00353 }
00354
00355 void TGo4AnalysisClient::SendAnalysisStatus()
00356 {
00357 TRACE((12,"TGo4AnalysisClient::SendAnalysisStatus()",__LINE__, __FILE__));
00358
00359 TGo4Analysis* ana=GetAnalysis();
00360 TGo4Log::Debug(" AnalysisClient - sending current analysis settings ");
00361 if(ana)
00362 {
00363 TGo4AnalysisStatus* state=ana->CreateStatus();
00364 SendStatus(state);
00365 delete state;
00366 }
00367 else
00368 {
00369 SendStatusMessage(3,kFALSE, "ERROR sending analysis status: no analysis ");
00370 }
00371 }
00372
00373 void TGo4AnalysisClient::SendAnalysisClientStatus()
00374 {
00375 TRACE((12,"TGo4AnalysisClient::SendAnalysisClientStatus()",__LINE__, __FILE__));
00376
00377 TGo4Log::Debug(" AnalysisClient - sending current analysis client status ");
00378 SendStatusBuffer();
00379
00380
00381
00382
00383
00384 }
00385
00386 void TGo4AnalysisClient::SendNamesList()
00387 {
00388 TRACE((12,"TGo4AnalysisClient::SendNamesList()",__LINE__, __FILE__));
00389
00390 fxAnalysis->UpdateNamesList();
00391 TGo4AnalysisObjectNames* state= fxAnalysis->GetNamesList();
00392 if(state)
00393 {
00394 TGo4Log::Debug(" AnalysisClient - sending names list ");
00395
00396 SendObject(state);
00397 }
00398 else
00399 {
00400 SendStatusMessage(3,kTRUE,"Analysis Client: Send Names List - ERROR: no nameslist !!! ");
00401 }
00402 }
00403
00404 void TGo4AnalysisClient::KillMain()
00405 {
00406 TRACE((12,"TGo4AnalysisClient::KillMain()",__LINE__, __FILE__));
00407 if(GetThreadHandler()) GetThreadHandler()->Stop(fcMainName);
00408
00409 if(GetTask()) GetTask()->WakeCommandQueue();
00410 if(GetThreadHandler()) GetThreadHandler()->Cancel(fcMainName);
00411 SendStatusMessage(2,kTRUE,"AnalysisClient %s has killed main analysis thread.",GetName());
00412 }
00413
00414 void TGo4AnalysisClient::RestartMain()
00415 {
00416 TRACE((12,"TGo4AnalysisClient::RestartMain()",__LINE__, __FILE__));
00417 if(GetThreadHandler()) GetThreadHandler()->Stop(fcMainName);
00418
00419 if(GetTask()) GetTask()->WakeCommandQueue();
00420 if(GetThreadHandler())
00421 {
00422 GetThreadHandler()->ReCreate(fcMainName);
00423 GetThreadHandler()->Start(fcMainName);
00424 }
00425 fxRatemeter->Reset();
00426 SendStatusMessage(2,kTRUE,"AnalysisClient %s has killed and relaunched main analysis thread.",GetName());
00427 }
00428
00429 void TGo4AnalysisClient::Stop()
00430 {
00431 TRACE((12,"TGo4AnalysisClient::Stop()",__LINE__, __FILE__));
00432 if(MainIsRunning()) fxAnalysis->PostLoop();
00433 TGo4Slave::Stop();
00434 SendStatusMessage(1,kTRUE,"AnalysisClient %s has STOPPED analysis processing.",GetName());
00436 UpdateRate(-1);
00437 UpdateStatusBuffer();
00438
00439 SendAnalysisClientStatus();
00440 }
00441
00442 void TGo4AnalysisClient::UpdateRate(Int_t counts)
00443 {
00444 TRACE((12,"TGo4AnalysisClient::UpdateRate(Int_t)",__LINE__, __FILE__));
00445 fxRatemeter->Update(counts);
00446 }
00447 UInt_t TGo4AnalysisClient::GetCurrentCount()
00448 {
00449 return (fxRatemeter->GetCurrentCount());
00450 }
00451
00452 Bool_t TGo4AnalysisClient::TestRatemeter()
00453 {
00454 return (fxRatemeter->TestUpdate());
00455 }
00456
00457 Bool_t TGo4AnalysisClient::TestBufferUpdateConditions()
00458 {
00459
00460 Double_t currenttime=fxUpdateWatch->RealTime();
00461 fxUpdateWatch->Continue();
00462 Double_t deltatime=currenttime-fdBufferUpdateTime;
00463 UInt_t currentcount=GetCurrentCount();
00464 if( (currentcount && (currentcount % fguSTATUSUPDATE == 0)) || (deltatime>fgdSTATUSTIMEOUT) )
00465 {
00466
00467 fdBufferUpdateTime=currenttime;
00468 return kTRUE;
00469 }
00470 else
00471 {
00472 return kFALSE;
00473 }
00474 }
00475
00476
00477 void TGo4AnalysisClient::StartObjectServer(const Text_t* basename, const Text_t* passwd)
00478 {
00479 StopObjectServer();
00480 fxHistoServer= new TGo4HistogramServer(this,basename,passwd);
00481
00482
00483 }
00484
00485 void TGo4AnalysisClient::StopObjectServer()
00486 {
00487
00488 if(fxHistoServer)
00489 {
00490 delete fxHistoServer;
00491 fxHistoServer=0;
00492
00493
00494 } else {}
00495 }
00496
00497 void TGo4AnalysisClient::Terminate ()
00498 {
00499
00500 StopObjectServer();
00501 if(GetTask())
00502 GetTask()->TGo4ThreadManager::Terminate();
00503 }
00504
00505 void TGo4AnalysisClient::TerminateFast ()
00506 {
00507 StopObjectServer();
00508 TGo4Log::Debug("TGo4AnalysisClient::TerminateFast with delete analysis");
00509 if(GetThreadHandler())
00510 {
00511 GetThreadHandler()->StopAll();
00512 GetThreadHandler()->Cancel(fcWatchName);
00513 GetThreadHandler()->Cancel(fcMainName);
00514 GetThreadHandler()->Cancel(GetTask()->GetTaskHandler()->GetDatName());
00515 GetThreadHandler()->Cancel(GetTask()->GetTaskHandler()->GetStatName());
00516 }
00517 delete fxAnalysis;
00518 gApplication->Terminate();
00519 }
00520
00521
00522 void TGo4AnalysisClient::ExecuteString(const Text_t* command)
00523 {
00524 if(strstr(command,"ANHServStart"))
00525 {
00526 Text_t buffer[TGo4ThreadManager::fguTEXTLENGTH];
00527 Text_t base[TGo4ThreadManager::fguTEXTLENGTH];
00528 Text_t pass[TGo4ThreadManager::fguTEXTLENGTH];
00529 snprintf(buffer,TGo4ThreadManager::fguTEXTLENGTH,"%s",command);
00530 strtok(buffer,":");
00531 snprintf(base,TGo4ThreadManager::fguTEXTLENGTH,"%s",strtok(0,":"));
00532 snprintf(pass,TGo4ThreadManager::fguTEXTLENGTH,"%s",strtok(0,":"));
00533 cout <<"ExecuteString found base "<< base<<", passwd "<<pass << endl;
00534 StartObjectServer(base, pass);
00535 }
00536 else if (!strcmp(command,"ANHServStop"))
00537 {
00538 StopObjectServer();
00539 }
00540 else
00541 {
00542 TString comstring="";
00543 const char* cursor = command;
00544 const char* at=0;
00545 do
00546 {
00547 Ssiz_t len=strlen(cursor);
00548 at=strstr(cursor,"@");
00549 if(at)
00550 {
00551
00552 len=(Ssiz_t) (at-cursor);
00553 comstring.Append(cursor,len);
00554 comstring.Append("TGo4Analysis::Instance()->");
00555 cursor=at+1;
00556 }
00557 else
00558 {
00559
00560 comstring.Append(cursor);
00561 }
00562 }
00563 while(at);
00564 TGo4Slave::ExecuteString(comstring.Data());
00565 }
00566 }
00567 Int_t TGo4AnalysisClient::StartWorkThreads()
00568 {
00569
00570 TGo4TaskOwner::StartWorkThreads();
00571 if(GetThreadHandler())
00572 {
00573 GetThreadHandler()->Start(fcMainName);
00574 GetThreadHandler()->Start(fcWatchName);
00575 }
00576 return 0;
00577 }
00578
00579 Int_t TGo4AnalysisClient::StopWorkThreads()
00580 {
00581
00582 TGo4TaskOwner::StopWorkThreads();
00583 if(GetThreadHandler())
00584 {
00585 GetThreadHandler()->Stop(fcMainName);
00586 GetThreadHandler()->Stop(fcWatchName);
00587 }
00588 return 0;
00589 }
00590
00591 void TGo4AnalysisClient::SetCintMode(Bool_t on)
00592 {
00593 fbCintMode=on;
00594 gROOT->SetBatch(!on);
00595 fxAnalysis->SetAutoSave(kFALSE);
00596 if(fbCintMode)
00597 {
00598 if(fxCintLockTimer==0)
00599 fxCintLockTimer=new TGo4CintLockTimer(fguCINTTIMERPERIOD);
00600 fxCintLockTimer->TurnOn();
00601
00602 }
00603 else
00604 {
00605 if (fxCintLockTimer!=0) {
00606 fxCintLockTimer->TurnOff();
00607 delete fxCintLockTimer;
00608 fxCintLockTimer = 0;
00609 }
00610 }
00611 }
00612
00613
00614 ClassImp(TGo4AnalysisClient)
00615
00616
00617
00618
00619