00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "TGo4AnalysisClientImp.h"
00015
00016 #include <stdlib.h>
00017
00018 #include "TApplication.h"
00019 #include "TTimeStamp.h"
00020 #include "TDataType.h"
00021 #include "TROOT.h"
00022
00023 #include "TGo4Log.h"
00024 #include "TGo4LockGuard.h"
00025
00026 #include "TGo4CommandInvoker.h"
00027 #include "TGo4AnalysisCommandList.h"
00028 #include "TGo4ComServerQuit.h"
00029 #include "TGo4ThreadHandler.h"
00030 #include "TGo4ThreadManager.h"
00031 #include "TGo4Task.h"
00032 #include "TGo4TaskHandler.h"
00033 #include "TGo4ClientStatus.h"
00034 #include "TGo4AnalysisClientStatus.h"
00035 #include "TGo4AnalysisObjectNames.h"
00036 #include "TGo4AnalysisSniffer.h"
00037 #include "TGo4HistogramServer.h"
00038 #include "TGo4AnalysisStatus.h"
00039 #include "TGo4AnalysisImp.h"
00040 #include "TGo4AnalysisStep.h"
00041
00042 #include "TGo4CintLockTimer.h"
00043 #include "TGo4AnalysisMainRunnable.h"
00044 #include "TGo4AnalysisWatchRunnable.h"
00045 #include "TGo4Ratemeter.h"
00046 #include "TGo4TaskStatus.h"
00047
00048 const char* TGo4AnalysisClient::fgcWATCHTHREAD="WATCH-";
00049 const char* TGo4AnalysisClient::fgcMAINTHREAD="MAIN-";
00050 const UInt_t TGo4AnalysisClient::fguSTATUSUPDATE = 1000;
00051 const Double_t TGo4AnalysisClient::fgdSTATUSTIMEOUT = 2;
00052 const UInt_t TGo4AnalysisClient::fguCINTTIMERPERIOD = 200;
00053
00054
00055 TGo4AnalysisClient::TGo4AnalysisClient(const char* name,
00056 TGo4Analysis* analysis,
00057 const char* host,
00058 UInt_t negport,
00059 Bool_t histoserver,
00060 const char* basename,
00061 const char* passwd,
00062 Bool_t servermode,
00063 Bool_t autorun,
00064 Bool_t cintmode,
00065 Bool_t loadprefs,
00066 Bool_t showrate) :
00067 TGo4Slave(name, servermode, host, negport),
00068 fdBufferUpdateTime(0),
00069 fxHistoServer(0),
00070 fbAutoStart(autorun),
00071 fbCintMode(kFALSE),
00072 fxCintLockTimer(0),
00073 fbLoadPrefs(loadprefs),
00074 fbShowRate(showrate)
00075 {
00076 GO4TRACE((15,"TGo4AnalysisClient::TGo4AnalysisClient(const char*,...)",__LINE__, __FILE__));
00077
00078 if(analysis==0) {
00079 TGo4Log::Debug("!!! AnalysisClient ''%s'': no external analysis specified !!",GetName());
00080 fxAnalysis=TGo4Analysis::Instance();
00081 } else {
00082 fxAnalysis=analysis;
00083 }
00084 fxAnalysis->SetAnalysisClient(this);
00085
00086 Constructor(histoserver,basename,passwd);
00087
00088 SetCintMode(cintmode);
00089 }
00090
00091 TGo4AnalysisClient::TGo4AnalysisClient(int argc, char** argv,
00092 TGo4Analysis* analysis,
00093 Bool_t histoserver,
00094 const char* basename,
00095 const char* passwd,
00096 Bool_t servermode,
00097 Bool_t autorun) :
00098 TGo4Slave(argv[2], servermode, argv[3] , (argc>4) ? atoi(argv[4]) : 5000),
00099 fdBufferUpdateTime(0),
00100 fxHistoServer(0),
00101 fbAutoStart(autorun),
00102 fbCintMode(kFALSE),
00103 fxCintLockTimer(0),
00104 fbLoadPrefs(kTRUE),
00105 fbShowRate(kFALSE)
00106 {
00107 GO4TRACE((15,"TGo4AnalysisClient::TGo4AnalysisClient(int, char**...)",__LINE__, __FILE__));
00108
00109 if(argc<5)
00110 {
00111 TGo4Log::Error("!!! AnalysisClient: missing commandline arguments, aborting !!");
00112 gApplication->Terminate();
00113 }
00114 if(!strcmp("-lGUI",argv[1]))
00115 {
00116 TGo4Log::Error(" !!! AnalysisClient: GUI mode not specified, aborting !!");
00117 gApplication->Terminate();
00118 }
00119
00120 if(analysis==0)
00121 {
00122 TGo4Log::Debug(" !!! AnalysisClient ''%s'': no external analysis specified !!",GetName());
00123 fxAnalysis=TGo4Analysis::Instance();
00124 }
00125 else
00126 {
00127 fxAnalysis=analysis;
00128 }
00129 fxAnalysis->SetAnalysisClient(this);
00130 Constructor(histoserver,basename,passwd);
00131 }
00132
00133 void TGo4AnalysisClient::Constructor(Bool_t starthistserv, const char* basename, const char* passwd)
00134 {
00135 if (IsServer()) {
00136 if (fxAnalysis->fServerObserverPass.Length()>0)
00137 TGo4TaskHandler::SetObservAccount(0, fxAnalysis->fServerObserverPass.Data());
00138 if (fxAnalysis->fServerCtrlPass.Length()>0)
00139 TGo4TaskHandler::SetCtrlAccount(0, fxAnalysis->fServerCtrlPass.Data());
00140 if (fxAnalysis->fServerAdminPass.Length()>0)
00141 TGo4TaskHandler::SetAdminAccount(0, fxAnalysis->fServerAdminPass.Data());
00142 }
00143
00144 fxRatemeter = new TGo4Ratemeter;
00145 TGo4Log::Debug(" AnalysisClient ''%s'' started ",GetName());
00146
00147 fcMainName = TString::Format("%s%s", fgcMAINTHREAD, GetName());
00148 fcWatchName = TString::Format("%s%s", fgcWATCHTHREAD, GetName());
00149
00150 TGo4AnalysisMainRunnable* mainrun =
00151 new TGo4AnalysisMainRunnable(Form("MainRunnable of %s",GetName()), this);
00152 TGo4AnalysisWatchRunnable* watchrun =
00153 new TGo4AnalysisWatchRunnable(Form("WatchRunnable of %s",GetName()), this);
00154
00155
00156 TGo4ThreadHandler* th = GetThreadHandler();
00157 th->NewThread(fcMainName.Data(), mainrun);
00158 th->NewThread(fcWatchName.Data(), watchrun);
00159
00160 TGo4CommandInvoker::Instance();
00161 TGo4CommandInvoker::SetCommandList(new TGo4AnalysisCommandList);
00162 TGo4CommandInvoker::Register("AnalysisClient",this);
00163 TGo4Slave::Stop();
00164 UpdateStatusBuffer();
00165 if(starthistserv) StartObjectServer(basename, passwd);
00166 GetTask()->Launch();
00167 }
00168
00169
00170 TGo4AnalysisClient::~TGo4AnalysisClient()
00171 {
00172 GO4TRACE((15,"TGo4AnalysisClient::~TGo4AnalysisClient()",__LINE__, __FILE__));
00173
00174
00175
00176
00177
00178 fxAnalysis->LockAutoSave();
00179 {
00180 StopObjectServer();
00181 if(GetThreadHandler()) GetThreadHandler()->CancelAll();
00182 }
00183 fxAnalysis->UnLockAutoSave();
00184
00185 delete fxCintLockTimer;
00186 delete fxRatemeter;
00187 delete fxAnalysis;
00188 TGo4CommandInvoker::UnRegister(this);
00189 }
00190
00191 Int_t TGo4AnalysisClient::Initialization()
00192 {
00193 TGo4LockGuard mainguard;
00194 SendStatusMessage(1, kTRUE, TString::Format("AnalysisClient %s starting initialization...",GetName()));
00195
00196 if(!fbAutoStart) {
00197
00198
00199
00200 if (fbLoadPrefs) {
00201 if(fxAnalysis->LoadStatus()) {
00202
00203 SendStatusMessage(1,kTRUE, TString::Format("AnalysisClient %s: Status loaded from %s",
00204 GetName(), TGo4Analysis::fgcDEFAULTSTATUSFILENAME));
00205 } else {
00206 SendStatusMessage(2,kTRUE, TString::Format("AnalysisClient %s: Could not load status from %s",
00207 GetName(), TGo4Analysis::fgcDEFAULTSTATUSFILENAME));
00208 }
00209 }
00210
00211 if(!fxAnalysis->IsAutoSaveOn()){
00212 SendStatusMessage(1,kTRUE, TString::Format("AnalysisClient %s: AUTOSAVE is DISABLED, Initialization did NOT load analysis objects!",
00213 GetName()));
00214 }
00215 else if(fxAnalysis->LoadObjects()) {
00216 SendStatusMessage(1,kTRUE, TString::Format("AnalysisClient %s: Objects loaded.",GetName()));
00217 }
00218 else {
00219
00220 SendStatusMessage(2,kTRUE, TString::Format("AnalysisClient %s: Initialization could not load analysis objects!",GetName()));
00221 }
00222
00223 SendStatusMessage(1,kTRUE, TString::Format("Analysis Slave %s waiting for submit and start commands...",GetName()));
00224 TGo4Slave::Stop();
00225 } else {
00226
00227
00228
00229 SendStatusMessage(1,kTRUE, TString::Format("AnalysisSlave %s initializing analysis...",GetName()));
00230 if(fxAnalysis->InitEventClasses()) {
00231 if(IsCintMode()) {
00232 SendStatusMessage(1,kTRUE, TString::Format("Analysis CINTServer %s in MainCycle suspend mode.",GetName()));
00233 TGo4Slave::Stop();
00234 } else {
00235 SendStatusMessage(1,kTRUE, TString::Format("AnalysisSlave %s starting analysis...",GetName()));
00236 Start();
00237 }
00238 } else {
00239 SendStatusMessage(2,kTRUE, TString::Format("AnalysisSlave %s failed initializing analysis!",GetName()));
00240 TGo4Slave::Stop();
00241 }
00242 }
00243 SendAnalysisStatus();
00244 UpdateStatusBuffer();
00245 SendAnalysisClientStatus();
00246 SendStatusMessage(1,kFALSE, TString::Format("AnalysisClient %s has finished initialization.",GetName()));
00247 return 0;
00248 }
00249
00250
00251
00252 void TGo4AnalysisClient::UpdateStatus(TGo4TaskStatus * state)
00253 {
00254 GO4TRACE((12,"TGo4AnalysisClient::UpdateStatus(TGo4ClientStatus*)",__LINE__, __FILE__));
00255 TGo4Slave::UpdateStatus(state);
00256 TGo4AnalysisClientStatus* anstate= dynamic_cast<TGo4AnalysisClientStatus*> (state);
00257 if (anstate) {
00258 anstate->SetRates(fxRatemeter->GetRate(), fxRatemeter->GetAvRate(), fxRatemeter->GetCurrentCount(),fxRatemeter->GetTime());
00259
00260 anstate->SetRunning(fxAnalysis->IsRunning());
00261
00262 TGo4AnalysisStep* firststep=fxAnalysis->GetAnalysisStep(0);
00263
00264 if(firststep)
00265 anstate->SetCurrentSource(firststep->GetEventSourceName());
00266 else
00267 anstate->SetCurrentSource("- No event source -");
00268 }
00269 }
00270
00271 TGo4TaskStatus* TGo4AnalysisClient::CreateStatus()
00272 {
00273 GO4TRACE((12,"TGo4AnalysisClient::CreateStatus()",__LINE__, __FILE__));
00274
00275 TGo4AnalysisClientStatus* stat= new TGo4AnalysisClientStatus(GetName());
00276 UpdateStatus(stat);
00277 return stat;
00278 }
00279
00280 void TGo4AnalysisClient::Start()
00281 {
00282 GO4TRACE((12,"TGo4AnalysisClient::Start()",__LINE__, __FILE__));
00283 if(fxAnalysis->IsInitDone())
00284 {
00285 if(GetThreadHandler()) GetThreadHandler()->Start(fcMainName.Data());
00286 if(!MainIsRunning()) fxAnalysis->PreLoop();
00287 TGo4Slave::Start();
00288 fxRatemeter->Reset();
00289 fdBufferUpdateTime = TTimeStamp().AsDouble();
00290 SendStatusMessage(1,kTRUE, TString::Format("AnalysisClient %s has started analysis processing.",GetName()));
00291 UpdateRate(-2);
00292 UpdateStatusBuffer();
00293 SendAnalysisClientStatus();
00294 }
00295 else
00296 {
00297
00298 SendStatusMessage(2,kTRUE, TString::Format("Analysis %s was not initialized! Please SUBMIT settings first.",fxAnalysis->GetName()));
00299 }
00300 }
00301
00302 void TGo4AnalysisClient::SendAnalysisObject(const char * name)
00303 {
00304 GO4TRACE((12,"TGo4AnalysisClient::SendAnalysisObject(char* name)",__LINE__, __FILE__));
00305 TNamed* ob = fxAnalysis->GetObject(name);
00306 SendObject(ob);
00307 }
00308
00309 void TGo4AnalysisClient::SendAnalysisStatus()
00310 {
00311 GO4TRACE((12,"TGo4AnalysisClient::SendAnalysisStatus()",__LINE__, __FILE__));
00312 TGo4Analysis* ana = GetAnalysis();
00313 TGo4Log::Debug(" AnalysisClient - sending current analysis settings ");
00314 if(ana) {
00315 TGo4AnalysisStatus* state = ana->CreateStatus();
00316 SendStatus(state);
00317 delete state;
00318 } else {
00319 SendStatusMessage(3,kFALSE, "ERROR sending analysis status: no analysis ");
00320 }
00321 }
00322
00323
00324 void TGo4AnalysisClient::SendAnalysisClientStatus()
00325 {
00326 GO4TRACE((12,"TGo4AnalysisClient::SendAnalysisClientStatus()",__LINE__, __FILE__));
00327
00328 TGo4Log::Debug(" AnalysisClient - sending current analysis client status ");
00329 SendStatusBuffer();
00330
00331
00332
00333
00334
00335 }
00336
00337 void TGo4AnalysisClient::SendNamesList()
00338 {
00339 GO4TRACE((12,"TGo4AnalysisClient::SendNamesList()",__LINE__, __FILE__));
00340
00341 fxAnalysis->UpdateNamesList();
00342 TGo4AnalysisObjectNames* state= fxAnalysis->GetNamesList();
00343 if(state)
00344 {
00345 TGo4Log::Debug(" AnalysisClient - sending names list ");
00346
00347 SendObject(state);
00348 }
00349 else
00350 {
00351 SendStatusMessage(3,kTRUE,"Analysis Client: Send Names List - ERROR: no nameslist !!! ");
00352 }
00353 }
00354
00355 void TGo4AnalysisClient::KillMain()
00356 {
00357 GO4TRACE((12,"TGo4AnalysisClient::KillMain()",__LINE__, __FILE__));
00358 if(GetThreadHandler()) GetThreadHandler()->Stop(fcMainName.Data());
00359
00360 if(GetTask()) GetTask()->WakeCommandQueue();
00361 if(GetThreadHandler()) GetThreadHandler()->Cancel(fcMainName.Data());
00362 SendStatusMessage(2,kTRUE,TString::Format("AnalysisClient %s has killed main analysis thread.",GetName()));
00363 }
00364
00365 void TGo4AnalysisClient::RestartMain()
00366 {
00367 GO4TRACE((12,"TGo4AnalysisClient::RestartMain()",__LINE__, __FILE__));
00368 if(GetThreadHandler()) GetThreadHandler()->Stop(fcMainName.Data());
00369
00370 if(GetTask()) GetTask()->WakeCommandQueue();
00371 if(GetThreadHandler()) {
00372 GetThreadHandler()->ReCreate(fcMainName.Data());
00373 GetThreadHandler()->Start(fcMainName.Data());
00374 }
00375 fxRatemeter->Reset();
00376 SendStatusMessage(2,kTRUE,TString::Format("AnalysisClient %s has killed and relaunched main analysis thread.",GetName()));
00377 }
00378
00379 void TGo4AnalysisClient::Stop()
00380 {
00381 GO4TRACE((12,"TGo4AnalysisClient::Stop()",__LINE__, __FILE__));
00382
00383 if(MainIsRunning()) fxAnalysis->PostLoop();
00384 TGo4Slave::Stop();
00385 SendStatusMessage(1,kTRUE, TString::Format("AnalysisClient %s has STOPPED analysis processing.",GetName()));
00386 if(GetTask()->IsTerminating()) return;
00388 UpdateRate(-1);
00389 UpdateStatusBuffer();
00390
00391 SendAnalysisClientStatus();
00392
00393
00394
00395
00396
00398
00399
00400
00401
00402
00403
00404
00405
00406 }
00407
00408 void TGo4AnalysisClient::UpdateRate(Int_t counts)
00409 {
00410 GO4TRACE((12,"TGo4AnalysisClient::UpdateRate(Int_t)",__LINE__, __FILE__));
00411 if (fxRatemeter->Update(counts)) {
00412
00413 TGo4AnalysisSniffer* sniff = fxAnalysis->GetSniffer();
00414 if (sniff) sniff->RatemeterUpdate(fxRatemeter);
00415
00416 if (fbShowRate) {
00417 TString ratefmt;
00418 ratefmt.Form("\rCnt = %s Rate = %s Ev/s", TGo4Log::GetPrintfArg(kULong64_t),"%5.*f");
00419 int width(1);
00420 if (fxRatemeter->GetRate()>1e4) width=0; else
00421 if (fxRatemeter->GetRate()<1.) width = 3;
00422 printf(ratefmt.Data(), fxRatemeter->GetCurrentCount(), width, fxRatemeter->GetRate());
00423 fflush(stdout);
00424 }
00425 }
00426 }
00427
00428 UInt_t TGo4AnalysisClient::GetCurrentCount()
00429 {
00430 return fxRatemeter->GetCurrentCount();
00431 }
00432
00433 Bool_t TGo4AnalysisClient::TestRatemeter()
00434 {
00435 return fxRatemeter->TestUpdate();
00436 }
00437
00438 Bool_t TGo4AnalysisClient::TestBufferUpdateConditions()
00439 {
00440 Double_t currenttime = TTimeStamp().AsDouble();
00441 Double_t deltatime = currenttime - fdBufferUpdateTime;
00442 UInt_t currentcount = GetCurrentCount();
00443 if( (currentcount && (currentcount % fguSTATUSUPDATE == 0) && (deltatime>0.1*fgdSTATUSTIMEOUT))
00444 || (deltatime > fgdSTATUSTIMEOUT) ) {
00445
00446 fdBufferUpdateTime = currenttime;
00447 return kTRUE;
00448 }
00449
00450 return kFALSE;
00451 }
00452
00453
00454 void TGo4AnalysisClient::StartObjectServer(const char* basename, const char* passwd)
00455 {
00456 StopObjectServer();
00457 fxHistoServer= new TGo4HistogramServer(this,basename,passwd,kFALSE);
00458
00459
00460
00461
00462 }
00463
00464 void TGo4AnalysisClient::StopObjectServer()
00465 {
00466
00467 if(fxHistoServer) {
00468 delete fxHistoServer;
00469 fxHistoServer=0;
00470
00471
00472 }
00473 }
00474
00475 void TGo4AnalysisClient::Quit()
00476 {
00477
00478 Stop();
00479 fxAnalysis->CloseAnalysis();
00480 }
00481
00482
00483
00484 void TGo4AnalysisClient::Terminate(Bool_t termapp)
00485 {
00486 SetCintMode(kFALSE);
00487 StopObjectServer();
00488 if(GetTask())
00489 GetTask()->TGo4ThreadManager::Terminate(termapp);
00490 }
00491
00492 void TGo4AnalysisClient::TerminateFast()
00493 {
00494 StopObjectServer();
00495 TGo4Log::Debug("TGo4AnalysisClient::TerminateFast with delete analysis");
00496 if(GetThreadHandler()) {
00497 GetThreadHandler()->StopAll();
00498 GetThreadHandler()->Cancel(fcWatchName.Data());
00499 GetThreadHandler()->Cancel(fcMainName.Data());
00500 GetThreadHandler()->Cancel(GetTask()->GetTaskHandler()->GetDatName());
00501 GetThreadHandler()->Cancel(GetTask()->GetTaskHandler()->GetStatName());
00502 }
00503 delete fxAnalysis;
00504 gApplication->Terminate();
00505 }
00506
00507 void TGo4AnalysisClient::SubmitShutdown()
00508 {
00509 if(GetTask())
00510 {
00511 TGo4ComServerQuit* com= new TGo4ComServerQuit();
00512
00513 GetTask()->SubmitLocalCommand(com);
00514 }
00515
00516 }
00517
00518
00519 void TGo4AnalysisClient::ExecuteString(const char* command)
00520 {
00521 if(strstr(command,"ANHServStart")) {
00522 TString buffer = command;
00523 strtok((char*) buffer.Data(), ":");
00524 TString base = strtok(0,":");
00525 TString pass = strtok(0,":");
00526
00527 StartObjectServer(base.Data(), pass.Data());
00528 } else
00529 if (!strcmp(command,"ANHServStop")) {
00530 StopObjectServer();
00531 } else {
00532 TString comstring="";
00533 const char* cursor = command;
00534 const char* at=0;
00535 do
00536 {
00537 Ssiz_t len=strlen(cursor);
00538 at = strstr(cursor,"@");
00539 if(at)
00540 {
00541
00542 len=(Ssiz_t) (at-cursor);
00543 comstring.Append(cursor,len);
00544 comstring.Append("TGo4Analysis::Instance()->");
00545 cursor=at+1;
00546 }
00547 else
00548 {
00549
00550 comstring.Append(cursor);
00551 }
00552 }
00553 while(at);
00554 TGo4Slave::ExecuteString(comstring.Data());
00555 }
00556 }
00557 Int_t TGo4AnalysisClient::StartWorkThreads()
00558 {
00559
00560 TGo4TaskOwner::StartWorkThreads();
00561 if(GetThreadHandler()) {
00562 GetThreadHandler()->Start(fcMainName.Data());
00563 GetThreadHandler()->Start(fcWatchName.Data());
00564 }
00565 return 0;
00566 }
00567
00568 Int_t TGo4AnalysisClient::StopWorkThreads()
00569 {
00570
00571 TGo4TaskOwner::StopWorkThreads();
00572 if(GetThreadHandler()) {
00573 GetThreadHandler()->Stop(fcMainName.Data());
00574 GetThreadHandler()->Stop(fcWatchName.Data());
00575 }
00576 return 0;
00577 }
00578
00579 void TGo4AnalysisClient::SetCintMode(Bool_t on)
00580 {
00581 fbCintMode = on;
00582
00583 if (fbCintMode) {
00584 gROOT->SetBatch(kFALSE);
00585 fxAnalysis->SetAutoSave(kFALSE);
00586 }
00587 #if ROOT_VERSION_CODE > ROOT_VERSION(5,2,0)
00588
00590
00591
00592 #else
00593 if(fbCintMode) {
00594 if(fxCintLockTimer==0)
00595 fxCintLockTimer=new TGo4CintLockTimer(this,fguCINTTIMERPERIOD);
00596 fxCintLockTimer->TurnOn();
00597 }
00598 else {
00599 if (fxCintLockTimer!=0) {
00600 fxCintLockTimer->TurnOff();
00601 delete fxCintLockTimer;
00602 fxCintLockTimer = 0;
00603 }
00604 }
00605 #endif
00606 }
00607
00608 void TGo4AnalysisClient::LockAll()
00609 {
00610 TGo4Task* task=GetTask();
00611 if(task) task->LockAll();
00612 }
00613
00614 void TGo4AnalysisClient::UnLockAll()
00615 {
00616 TGo4Task* task=GetTask();
00617 if(task) task->UnLockAll();
00618 }
00619
00620 void TGo4AnalysisClient::SendStatusMessage(Int_t level, Bool_t printout, const TString& text)
00621 {
00622 TGo4Slave::SendStatusMessage(level, printout, text);
00623
00624 TGo4AnalysisSniffer* sniff = fxAnalysis->GetSniffer();
00625 if (sniff) sniff->StatusMessage(level, text);
00626
00627 }