00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "TGo4AnalysisClientImp.h"
00017
00018 #include "Riostream.h"
00019 #include <stdlib.h>
00020 #include "TApplication.h"
00021 #include "TStopwatch.h"
00022 #include "snprintf.h"
00023 #include "TROOT.h"
00024
00025 #include "TGo4Log.h"
00026 #include "TGo4LockGuard.h"
00027
00028 #include "TGo4CommandInvoker.h"
00029 #include "TGo4AnalysisCommandList.h"
00030 #include "TGo4ThreadHandler.h"
00031 #include "TGo4ThreadManager.h"
00032 #include "TGo4Task.h"
00033 #include "TGo4TaskHandler.h"
00034 #include "TGo4ClientStatus.h"
00035 #include "TGo4AnalysisClientStatus.h"
00036 #include "TGo4AnalysisObjectNames.h"
00037 #include "TGo4HistogramServer.h"
00038 #include "TGo4AnalysisStatus.h"
00039 #include "TGo4AnalysisImp.h"
00040 #include "TGo4AnalysisStep.h"
00041
00042 #include "TGo4CintLockTimer.h"
00043 #include "TGo4AnalysisMainRunnable.h"
00044 #include "TGo4AnalysisWatchRunnable.h"
00045 #include "TGo4Ratemeter.h"
00046 #include "TGo4TaskStatus.h"
00047
00048 class TGo4InterruptHandler : public TSignalHandler {
00049 public:
00050 TGo4InterruptHandler(TSignalHandler* old = 0) :
00051 TSignalHandler(kSigInterrupt, kFALSE),
00052 oldHandler(old),
00053 fbActive(kFALSE)
00054 {
00055 }
00056 virtual Bool_t Notify()
00057 {
00058 if (fbActive) return kTRUE;
00059 fbActive = kTRUE;
00060 Bool_t res = kTRUE;
00061 if (TGo4Analysis::Instance()!=0) {
00062 TGo4Analysis::Instance()->StopWaiting();
00063 }
00064 if (oldHandler!=0) res = oldHandler->Notify();
00065 TTimer::SingleShot(3000, ClassName(), this, "Pop()");
00066 return res;
00067 }
00068
00069 virtual void Pop()
00070 {
00071
00072 fbActive = kFALSE;
00073 TGo4Analysis::Instance()->SetRunning(kFALSE);
00074 }
00075
00076 protected:
00077 TSignalHandler* oldHandler;
00078 Bool_t fbActive;
00079 };
00080
00081
00082 const Text_t TGo4AnalysisClient::fgcWATCHTHREAD[]="WATCH-";
00083 const Text_t TGo4AnalysisClient::fgcMAINTHREAD[]="MAIN-";
00084 const UInt_t TGo4AnalysisClient::fguSTATUSUPDATE = 1000;
00085 const Double_t TGo4AnalysisClient::fgdSTATUSTIMEOUT = 2;
00086 const UInt_t TGo4AnalysisClient::fguCINTTIMERPERIOD = 200;
00087
00088 TGo4AnalysisClient::TGo4AnalysisClient(const char* name,
00089 TGo4Analysis* analysis,
00090 const char* host,
00091 UInt_t negport,
00092 Bool_t histoserver,
00093 const char* basename,
00094 const char* passwd,
00095 Bool_t servermode,
00096 Bool_t autorun,
00097 Bool_t clientmode)
00098 : TGo4Slave(name, servermode, host, negport),
00099 fdBufferUpdateTime(0), fxHistoServer(0), fbAutoStart(autorun), fbCintMode(kFALSE),fxCintLockTimer(0)
00100
00101 {
00102 TRACE((15,"TGo4AnalysisClient::TGo4AnalysisClient(const char*,...)",__LINE__, __FILE__));
00103
00104 if(analysis==0)
00105 {
00106 TGo4Log::Debug("!!! AnalysisClient ''%s'': no external analysis specified !!",GetName());
00107 fxAnalysis=TGo4Analysis::Instance();
00108 }
00109 else
00110 {
00111 fxAnalysis=analysis;
00112 }
00113 fxAnalysis->SetAnalysisClient(this);
00114 Constructor(histoserver,basename,passwd);
00115
00116 SetCintMode(clientmode);
00117
00118 if (clientmode) {
00119 TSignalHandler* oldhandler = gApplication->GetSignalHandler();
00120 oldhandler->Remove();
00121
00122 fxInterruptHandler = new TGo4InterruptHandler(oldhandler);
00123 fxInterruptHandler->Add();
00124 }
00125
00126 }
00127
00128 TGo4AnalysisClient::TGo4AnalysisClient(int argc, char** argv,
00129 TGo4Analysis* analysis,
00130 Bool_t histoserver,
00131 const char* basename,
00132 const char* passwd,
00133 Bool_t servermode,
00134 Bool_t autorun)
00135 : TGo4Slave(argv[2],servermode,
00136 argv[3] , (argc>4) ? atoi(argv[4]) : 5000),
00137 fdBufferUpdateTime(0), fxHistoServer(0), fbAutoStart(autorun), fbCintMode(kFALSE),fxCintLockTimer(0)
00138
00139 {
00140 TRACE((15,"TGo4AnalysisClient::TGo4AnalysisClient(int, char**...)",__LINE__, __FILE__));
00141
00142 if(argc<5)
00143 {
00144 TGo4Log::Error("!!! AnalysisClient: missing commandline arguments, aborting !!");
00145 gApplication->Terminate();
00146 }
00147 if(!strcmp("-lGUI",argv[1]))
00148 {
00149 TGo4Log::Error(" !!! AnalysisClient: GUI mode not specified, aborting !!");
00150 gApplication->Terminate();
00151 }
00152
00153 if(analysis==0)
00154 {
00155 TGo4Log::Debug(" !!! AnalysisClient ''%s'': no external analysis specified !!",GetName());
00156 fxAnalysis=TGo4Analysis::Instance();
00157 }
00158 else
00159 {
00160 fxAnalysis=analysis;
00161 }
00162 fxAnalysis->SetAnalysisClient(this);
00163 Constructor(histoserver,basename,passwd);
00164 }
00165
00166 void TGo4AnalysisClient::Constructor(Bool_t starthistserv, const char* basename, const char* passwd)
00167 {
00168 fxUpdateWatch = new TStopwatch;
00169 fxRatemeter= new TGo4Ratemeter;
00170 TGo4Log::Debug(" AnalysisClient ''%s'' started ",GetName());
00171 fcMainName= new Text_t[TGo4ThreadManager::fguTEXTLENGTH];
00172 fcWatchName= new Text_t[TGo4ThreadManager::fguTEXTLENGTH];
00173 Text_t namebuffer[TGo4ThreadManager::fguTEXTLENGTH];
00174 snprintf(namebuffer, TGo4ThreadManager::fguTEXTLENGTH-1 ,"MainRunnable of %s",GetName());
00175 TGo4AnalysisMainRunnable* mainrun= new TGo4AnalysisMainRunnable(namebuffer, this);
00176 snprintf(namebuffer,TGo4ThreadManager::fguTEXTLENGTH-1,"WatchRunnable of %s",GetName());
00177 TGo4AnalysisWatchRunnable* watchrun= new TGo4AnalysisWatchRunnable(namebuffer, this);
00178
00179 snprintf(fcMainName, TGo4ThreadManager::fguTEXTLENGTH-1,"%s%s",fgcMAINTHREAD,GetName());
00180 TGo4ThreadHandler* th=GetThreadHandler();
00181 th->NewThread(fcMainName,mainrun);
00182 snprintf(fcWatchName, TGo4ThreadManager::fguTEXTLENGTH-1,"%s%s",fgcWATCHTHREAD,GetName());
00183 th->NewThread(fcWatchName,watchrun);
00184 TGo4CommandInvoker::Instance();
00185 TGo4CommandInvoker::SetCommandList(new TGo4AnalysisCommandList);
00186 TGo4CommandInvoker::Register("AnalysisClient",this);
00187 TGo4Slave::Stop();
00188 UpdateStatusBuffer();
00189 if(starthistserv) StartObjectServer(basename, passwd);
00190 GetTask()->Launch();
00191
00192 fxInterruptHandler = 0;
00193 }
00194
00195
00196 TGo4AnalysisClient::~TGo4AnalysisClient()
00197 {
00198 TRACE((15,"TGo4AnalysisClient::~TGo4AnalysisClient()",__LINE__, __FILE__));
00199 if (fxInterruptHandler!=0) {
00200
00201 fxInterruptHandler->Remove();
00202 delete fxInterruptHandler;
00203
00204
00205 TSignalHandler* oldhandler = gApplication->GetSignalHandler();
00206 oldhandler->Add();
00207 }
00208
00209
00210
00211
00212
00213
00214
00215 fxAnalysis->LockAutoSave();
00216 {
00217 StopObjectServer();
00218 if(GetThreadHandler()) GetThreadHandler()->CancelAll();
00219 }
00220 fxAnalysis->UnLockAutoSave();
00221
00222 delete fxCintLockTimer;
00223 delete fxRatemeter;
00224 delete fxUpdateWatch;
00225 delete fxAnalysis;
00226 delete [] fcMainName;
00227 delete [] fcWatchName;
00228 TGo4CommandInvoker::UnRegister(this);
00229 }
00230
00231 Int_t TGo4AnalysisClient::Initialization()
00232 {
00233 TGo4LockGuard mainguard;
00234 SendStatusMessage(1,kTRUE,"AnalysisClient %s starting initialization...",GetName());
00235
00236 if(!fbAutoStart)
00237 {
00238
00239
00240 if(fxAnalysis->LoadStatus())
00241 {
00242
00243 SendStatusMessage(1,kTRUE,"AnalysisClient %s: Status loaded from %s",
00244 GetName(), TGo4Analysis::fgcDEFAULTSTATUSFILENAME);
00245 }
00246 else
00247 {
00248 SendStatusMessage(2,kTRUE,"AnalysisClient %s: Could not load status from %s",
00249 GetName(), TGo4Analysis::fgcDEFAULTSTATUSFILENAME);
00250 }
00251
00252 if(fxAnalysis->LoadObjects())
00253 {
00254 SendStatusMessage(1,kTRUE,"AnalysisClient %s: Objects loaded.",GetName());
00255 }
00256 else
00257 {
00258
00259 SendStatusMessage(2,kTRUE,"AnalysisClient %s: Initialization could not load analysis objects!",GetName());
00260 }
00261
00262 SendStatusMessage(1,kTRUE,"Analysis Slave %s waiting for submit and start commands...",GetName());
00263 TGo4Slave::Stop();
00264 }
00265 else
00266 {
00267
00268
00269
00270 SendStatusMessage(1,kTRUE,"AnalysisSlave %s initializing analysis...",GetName());
00271 if(fxAnalysis->InitEventClasses())
00272 {
00273 if(IsCintMode())
00274 {
00275 SendStatusMessage(1,kTRUE,"Analysis CINTServer %s in MainCycle suspend mode.",GetName());
00276 TGo4Slave::Stop();
00277 }
00278 else
00279 {
00280 SendStatusMessage(1,kTRUE,"AnalysisSlave %s starting analysis...",GetName());
00281 Start();
00282 }
00283 }
00284 else
00285 {
00286 SendStatusMessage(2,kTRUE,"AnalysisSlave %s failed initializing analysis!",GetName());
00287 TGo4Slave::Stop();
00288 }
00289 }
00290 SendAnalysisStatus();
00291 UpdateStatusBuffer();
00292 SendAnalysisClientStatus();
00293 SendStatusMessage(1,kFALSE,"AnalysisClient %s has finished initialization.",GetName());
00294 return 0;
00295 }
00296
00297
00298
00299 void TGo4AnalysisClient::UpdateStatus(TGo4TaskStatus * state)
00300 {
00301 TRACE((12,"TGo4AnalysisClient::UpdateStatus(TGo4ClientStatus*)",__LINE__, __FILE__));
00302 TGo4Slave::UpdateStatus(state);
00303 TGo4AnalysisClientStatus* anstate= dynamic_cast<TGo4AnalysisClientStatus*> (state);
00304 if (anstate)
00305 {
00306 Double_t rate=fxRatemeter->GetRate();
00307 Double_t avrate=fxRatemeter->GetAvRate();
00308 UInt_t n=fxRatemeter->GetCurrentCount();
00309 Double_t t=fxRatemeter->GetTime();
00310 anstate->SetRates(rate, avrate, n,t);
00311
00312 anstate->SetRunning(fxAnalysis->IsRunning());
00313
00314 TGo4AnalysisStep* firststep=fxAnalysis->GetAnalysisStep(0);
00315
00316 if(firststep)
00317 anstate->SetCurrentSource(firststep->GetEventSourceName());
00318 else
00319 anstate->SetCurrentSource("- No event source -");
00320 }
00321 else
00322 {
00323
00324 }
00325 }
00326 TGo4TaskStatus* TGo4AnalysisClient::CreateStatus()
00327 {
00328 TRACE((12,"TGo4AnalysisClient::CreateStatus()",__LINE__, __FILE__));
00329
00330 TGo4AnalysisClientStatus* stat= new TGo4AnalysisClientStatus(GetName());
00331 UpdateStatus(stat);
00332 return stat;
00333 }
00334
00335 void TGo4AnalysisClient::Start()
00336 {
00337 TRACE((12,"TGo4AnalysisClient::Start()",__LINE__, __FILE__));
00338 if(fxAnalysis->IsInitDone())
00339 {
00340 if(GetThreadHandler()) GetThreadHandler()->Start(fcMainName);
00341 if(!MainIsRunning()) fxAnalysis->PreLoop();
00342 TGo4Slave::Start();
00343 fxRatemeter->Reset();
00344
00345 fdBufferUpdateTime=fxUpdateWatch->RealTime();
00346 fxUpdateWatch->Continue();
00347 SendStatusMessage(1,kTRUE,"AnalysisClient %s has started analysis processing.",GetName());
00348 UpdateRate(-2);
00349 UpdateStatusBuffer();
00350 SendAnalysisClientStatus();
00351 }
00352 else
00353 {
00354
00355 SendStatusMessage(2,kTRUE,"Analysis %s was not initialized! Please SUBMIT settings first.",fxAnalysis->GetName());
00356 }
00357 }
00358
00359 void TGo4AnalysisClient::SendAnalysisObject(const Text_t * name)
00360 {
00361 TRACE((12,"TGo4AnalysisClient::SendAnalysisObject(Text_t* name)",__LINE__, __FILE__));
00362 TNamed* ob=fxAnalysis->GetObject(name);
00363 SendObject(ob);
00364 }
00365
00366 void TGo4AnalysisClient::SendAnalysisStatus()
00367 {
00368 TRACE((12,"TGo4AnalysisClient::SendAnalysisStatus()",__LINE__, __FILE__));
00369
00370 TGo4Analysis* ana=GetAnalysis();
00371 TGo4Log::Debug(" AnalysisClient - sending current analysis settings ");
00372 if(ana)
00373 {
00374 TGo4AnalysisStatus* state=ana->CreateStatus();
00375 SendStatus(state);
00376 delete state;
00377 }
00378 else
00379 {
00380 SendStatusMessage(3,kFALSE, "ERROR sending analysis status: no analysis ");
00381 }
00382 }
00383
00384 void TGo4AnalysisClient::SendAnalysisClientStatus()
00385 {
00386 TRACE((12,"TGo4AnalysisClient::SendAnalysisClientStatus()",__LINE__, __FILE__));
00387
00388 TGo4Log::Debug(" AnalysisClient - sending current analysis client status ");
00389 SendStatusBuffer();
00390
00391
00392
00393
00394
00395 }
00396
00397 void TGo4AnalysisClient::SendNamesList()
00398 {
00399 TRACE((12,"TGo4AnalysisClient::SendNamesList()",__LINE__, __FILE__));
00400
00401 fxAnalysis->UpdateNamesList();
00402 TGo4AnalysisObjectNames* state= fxAnalysis->GetNamesList();
00403 if(state)
00404 {
00405 TGo4Log::Debug(" AnalysisClient - sending names list ");
00406
00407 SendObject(state);
00408 }
00409 else
00410 {
00411 SendStatusMessage(3,kTRUE,"Analysis Client: Send Names List - ERROR: no nameslist !!! ");
00412 }
00413 }
00414
00415 void TGo4AnalysisClient::KillMain()
00416 {
00417 TRACE((12,"TGo4AnalysisClient::KillMain()",__LINE__, __FILE__));
00418 if(GetThreadHandler()) GetThreadHandler()->Stop(fcMainName);
00419
00420 if(GetTask()) GetTask()->WakeCommandQueue();
00421 if(GetThreadHandler()) GetThreadHandler()->Cancel(fcMainName);
00422 SendStatusMessage(2,kTRUE,"AnalysisClient %s has killed main analysis thread.",GetName());
00423 }
00424
00425 void TGo4AnalysisClient::RestartMain()
00426 {
00427 TRACE((12,"TGo4AnalysisClient::RestartMain()",__LINE__, __FILE__));
00428 if(GetThreadHandler()) GetThreadHandler()->Stop(fcMainName);
00429
00430 if(GetTask()) GetTask()->WakeCommandQueue();
00431 if(GetThreadHandler())
00432 {
00433 GetThreadHandler()->ReCreate(fcMainName);
00434 GetThreadHandler()->Start(fcMainName);
00435 }
00436 fxRatemeter->Reset();
00437 SendStatusMessage(2,kTRUE,"AnalysisClient %s has killed and relaunched main analysis thread.",GetName());
00438 }
00439
00440 void TGo4AnalysisClient::Stop()
00441 {
00442 TRACE((12,"TGo4AnalysisClient::Stop()",__LINE__, __FILE__));
00443 if(MainIsRunning()) fxAnalysis->PostLoop();
00444 TGo4Slave::Stop();
00445 SendStatusMessage(1,kTRUE,"AnalysisClient %s has STOPPED analysis processing.",GetName());
00446 if(GetTask()->IsTerminating()) return;
00448 UpdateRate(-1);
00449 UpdateStatusBuffer();
00450
00451 SendAnalysisClientStatus();
00452 }
00453
00454 void TGo4AnalysisClient::UpdateRate(Int_t counts)
00455 {
00456 TRACE((12,"TGo4AnalysisClient::UpdateRate(Int_t)",__LINE__, __FILE__));
00457 fxRatemeter->Update(counts);
00458 }
00459 UInt_t TGo4AnalysisClient::GetCurrentCount()
00460 {
00461 return (fxRatemeter->GetCurrentCount());
00462 }
00463
00464 Bool_t TGo4AnalysisClient::TestRatemeter()
00465 {
00466 return (fxRatemeter->TestUpdate());
00467 }
00468
00469 Bool_t TGo4AnalysisClient::TestBufferUpdateConditions()
00470 {
00471
00472 Double_t currenttime=fxUpdateWatch->RealTime();
00473 fxUpdateWatch->Continue();
00474 Double_t deltatime=currenttime-fdBufferUpdateTime;
00475 UInt_t currentcount=GetCurrentCount();
00476 if( (currentcount && (currentcount % fguSTATUSUPDATE == 0)) || (deltatime>fgdSTATUSTIMEOUT) )
00477 {
00478
00479 fdBufferUpdateTime=currenttime;
00480 return kTRUE;
00481 }
00482 else
00483 {
00484 return kFALSE;
00485 }
00486 }
00487
00488
00489 void TGo4AnalysisClient::StartObjectServer(const Text_t* basename, const Text_t* passwd)
00490 {
00491 StopObjectServer();
00492 fxHistoServer= new TGo4HistogramServer(this,basename,passwd,kFALSE);
00493
00494
00495
00496
00497 }
00498
00499 void TGo4AnalysisClient::StopObjectServer()
00500 {
00501
00502 if(fxHistoServer)
00503 {
00504 delete fxHistoServer;
00505 fxHistoServer=0;
00506
00507
00508 } else {}
00509 }
00510
00511 void TGo4AnalysisClient::Terminate (Bool_t termapp)
00512 {
00513 SetCintMode(kFALSE);
00514 StopObjectServer();
00515 if(GetTask())
00516 GetTask()->TGo4ThreadManager::Terminate(termapp);
00517 }
00518
00519 void TGo4AnalysisClient::TerminateFast ()
00520 {
00521 StopObjectServer();
00522 TGo4Log::Debug("TGo4AnalysisClient::TerminateFast with delete analysis");
00523 if(GetThreadHandler())
00524 {
00525 GetThreadHandler()->StopAll();
00526 GetThreadHandler()->Cancel(fcWatchName);
00527 GetThreadHandler()->Cancel(fcMainName);
00528 GetThreadHandler()->Cancel(GetTask()->GetTaskHandler()->GetDatName());
00529 GetThreadHandler()->Cancel(GetTask()->GetTaskHandler()->GetStatName());
00530 }
00531 delete fxAnalysis;
00532 gApplication->Terminate();
00533 }
00534
00535
00536 void TGo4AnalysisClient::ExecuteString(const Text_t* command)
00537 {
00538 if(strstr(command,"ANHServStart"))
00539 {
00540 Text_t buffer[TGo4ThreadManager::fguTEXTLENGTH];
00541 Text_t base[TGo4ThreadManager::fguTEXTLENGTH];
00542 Text_t pass[TGo4ThreadManager::fguTEXTLENGTH];
00543 snprintf(buffer,TGo4ThreadManager::fguTEXTLENGTH,"%s",command);
00544 strtok(buffer,":");
00545 snprintf(base,TGo4ThreadManager::fguTEXTLENGTH,"%s",strtok(0,":"));
00546 snprintf(pass,TGo4ThreadManager::fguTEXTLENGTH,"%s",strtok(0,":"));
00547 cout <<"ExecuteString found base "<< base<<", passwd "<<pass << endl;
00548 StartObjectServer(base, pass);
00549 }
00550 else if (!strcmp(command,"ANHServStop"))
00551 {
00552 StopObjectServer();
00553 }
00554 else
00555 {
00556 TString comstring="";
00557 const char* cursor = command;
00558 const char* at=0;
00559 do
00560 {
00561 Ssiz_t len=strlen(cursor);
00562 at=strstr(cursor,"@");
00563 if(at)
00564 {
00565
00566 len=(Ssiz_t) (at-cursor);
00567 comstring.Append(cursor,len);
00568 comstring.Append("TGo4Analysis::Instance()->");
00569 cursor=at+1;
00570 }
00571 else
00572 {
00573
00574 comstring.Append(cursor);
00575 }
00576 }
00577 while(at);
00578 TGo4Slave::ExecuteString(comstring.Data());
00579 }
00580 }
00581 Int_t TGo4AnalysisClient::StartWorkThreads()
00582 {
00583
00584 TGo4TaskOwner::StartWorkThreads();
00585 if(GetThreadHandler())
00586 {
00587 GetThreadHandler()->Start(fcMainName);
00588 GetThreadHandler()->Start(fcWatchName);
00589 }
00590 return 0;
00591 }
00592
00593 Int_t TGo4AnalysisClient::StopWorkThreads()
00594 {
00595
00596 TGo4TaskOwner::StopWorkThreads();
00597 if(GetThreadHandler())
00598 {
00599 GetThreadHandler()->Stop(fcMainName);
00600 GetThreadHandler()->Stop(fcWatchName);
00601 }
00602 return 0;
00603 }
00604
00605 void TGo4AnalysisClient::SetCintMode(Bool_t on)
00606 {
00607 fbCintMode=on;
00608 gROOT->SetBatch(!on);
00609 fxAnalysis->SetAutoSave(kFALSE);
00610 #if ROOT_VERSION_CODE > ROOT_VERSION(5,2,0)
00611
00613
00614
00615 #else
00616 if(fbCintMode)
00617 {
00618 if(fxCintLockTimer==0)
00619 fxCintLockTimer=new TGo4CintLockTimer(this,fguCINTTIMERPERIOD);
00620 fxCintLockTimer->TurnOn();
00621
00622 }
00623 else
00624 {
00625 if (fxCintLockTimer!=0) {
00626 fxCintLockTimer->TurnOff();
00627 delete fxCintLockTimer;
00628 fxCintLockTimer = 0;
00629 }
00630
00631 }
00632 #endif
00633 }
00634
00635 void TGo4AnalysisClient::LockAll()
00636 {
00637 TGo4Task* task=GetTask();
00638 if(task) task->LockAll();
00639 }
00640
00641 void TGo4AnalysisClient::UnLockAll()
00642 {
00643 TGo4Task* task=GetTask();
00644 if(task) task->UnLockAll();
00645 }
00646
00647