00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "TGo4Task.h"
00017
00018 #include "TFile.h"
00019 #include "TMutex.h"
00020 #include "TROOT.h"
00021 #include "snprintf.h"
00022
00023 #include "TGo4Log.h"
00024 #include "TGo4Buffer.h"
00025 #include "TGo4LockGuard.h"
00026 #include "TGo4CommandInvoker.h"
00027 #include "TGo4RemoteCommand.h"
00028 #include "TGo4LocalCommandRunnable.h"
00029 #include "TGo4ObjectQueue.h"
00030 #include "TGo4BufferQueue.h"
00031 #include "TGo4ThreadHandler.h"
00032 #include "TGo4TaskStatus.h"
00033 #include "TGo4TaskHandler.h"
00034 #include "TGo4TaskHandlerCommandList.h"
00035 #include "TGo4Master.h"
00036 #include "TGo4Slave.h"
00037
00038 const Int_t TGo4Task::fgiTERMID=999;
00039
00040 TGo4Task::TGo4Task(const char* name, Bool_t blockingmode,
00041 Bool_t autostart,
00042 Bool_t autocreate,
00043 Bool_t ismaster)
00044 : TGo4ThreadManager(name,blockingmode, autostart,autocreate),
00045 fbCommandMaster(ismaster), fxMaster(0), fxSlave(0),fxOwner(0),
00046 fbWorkIsStopped(kFALSE),fxStopBuffer(0),fxQuitBuffer(0)
00047 {
00048 fxCommandPrototype=0;
00049 fxStatusBuffer= new TGo4Buffer(TBuffer::kWrite);
00050 fxStatusMutex= new TMutex(kTRUE);
00051 fxStopBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComCloseInput);
00052 fxQuitBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComQuit);
00053 fxAbortBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComAbortTask);
00054
00055 TGo4CommandInvoker::Instance();
00056 TGo4CommandInvoker::SetCommandList(CreateCommandList());
00057
00058 TGo4CommandInvoker::Register("NoReceiver", this);
00059 TGo4CommandInvoker::Register("Task",this);
00060
00061
00062
00063 fxLocalCommandQueue = new TGo4ObjectQueue("localcommands");
00064
00065
00066 TString nomen("LocalCommandRunnable of "); nomen+=name;
00067 TGo4LocalCommandRunnable* commander = new TGo4LocalCommandRunnable(nomen.Data(), this);
00068
00069 fxCommanderName = "COMMANDER-"; fxCommanderName += name;
00070 GetWorkHandler()->NewThread(GetCommanderName(), commander);
00071 }
00072
00073 TGo4Task::~TGo4Task()
00074 {
00075 if(fxOwner)
00076 {
00077 fxOwner->SetTask(0,kFALSE);
00078 delete fxOwner;
00079 }
00080 delete fxLocalCommandQueue;
00081 delete fxCommandPrototype;
00082 delete fxQuitBuffer;
00083 delete fxStopBuffer;
00084 delete fxStatusBuffer;
00085 delete fxStatusMutex;
00086 TGo4CommandInvoker::UnRegister(this);
00087 }
00088
00089 void TGo4Task::SetMaster(TGo4Master* m)
00090 {
00091 if(m!=0) SetMaster(kTRUE);
00092 fxMaster=m;
00093 }
00094
00095 void TGo4Task::SetSlave(TGo4Slave* s)
00096 {
00097 if(s!=0) SetMaster(kFALSE);
00098 fxSlave=s;
00099 }
00100
00101 void TGo4Task::SetOwner(TGo4TaskOwner* owner)
00102 {
00103 fxOwner=owner;
00104 SetMaster(dynamic_cast<TGo4Master*>(owner));
00105 SetSlave(dynamic_cast<TGo4Slave*>(owner));
00106 }
00107
00108 void TGo4Task::Start()
00109 {
00110 if(fxSlave) fxSlave->Start();
00111 }
00112
00113 void TGo4Task::Stop()
00114 {
00115 if(fxSlave) fxSlave->Stop();
00116 }
00117
00118 void TGo4Task::Quit()
00119 {
00120 if(fxSlave) fxSlave->Quit();
00121 }
00122
00123 void TGo4Task::KillMain()
00124 {
00125 if(fxSlave) fxSlave->KillMain();
00126
00127 }
00128 void TGo4Task::RestartMain()
00129 {
00130 if(fxSlave) fxSlave->RestartMain();
00131 }
00132
00133 void TGo4Task::Terminate (Bool_t termapp)
00134 {
00135 if(fxSlave)
00136 fxSlave->Terminate(termapp);
00137 else
00138 TGo4ThreadManager::Terminate(termapp);
00139 }
00140
00141 void TGo4Task::TerminateFast ()
00142 {
00143 if(fxSlave)
00144 fxSlave->TerminateFast();
00145 else
00146 TGo4ThreadManager::TerminateFast();
00147 }
00148
00149 void TGo4Task::ExecuteString(const Text_t* command)
00150 {
00151 if(fxSlave)
00152 fxSlave->ExecuteString(command);
00153 else
00154 gROOT->ProcessLine(command);
00155
00156 }
00157
00158 TGo4TaskHandler* TGo4Task::GetTaskHandler()
00159 {
00160 return 0;
00161 }
00162
00163 TGo4BufferQueue* TGo4Task::GetCommandQueue(const char*)
00164 {
00165 return 0;
00166 }
00167
00168 TGo4BufferQueue * TGo4Task::GetStatusQueue(const char*)
00169 {
00170 return 0;
00171 }
00172 TGo4BufferQueue * TGo4Task::GetDataQueue(const char*)
00173 {
00174 return 0;
00175 }
00176
00177 TGo4TaskHandlerCommandList * TGo4Task::GetPrototype()
00178 {
00179
00180 return fxCommandPrototype;
00181 }
00182
00183 TGo4Status * TGo4Task::NextStatus(Bool_t wait)
00184 {
00185 if(!IsMaster()) return 0;
00186 TObject* obj=0;
00187 TGo4Status* stat=0;
00188 TGo4BufferQueue* statqueue=dynamic_cast<TGo4BufferQueue*> (GetStatusQueue());
00189 if(statqueue)
00190 {
00191 if(!wait && statqueue->IsEmpty())
00192 return 0;
00193 obj=statqueue->WaitObjectFromBuffer();
00194 if(obj)
00195 {
00196 if(obj->InheritsFrom(TGo4Status::Class()))
00197 {
00198 stat= dynamic_cast<TGo4Status*>(obj);
00199 }
00200 else
00201 {
00202 TGo4Log::Debug(" !!! Master Task: NextStatus ERROR, unknown object %s from status queue!!! ",
00203 obj->GetName());
00204 delete obj;
00205 }
00206 }
00207 else
00208 {
00209 TGo4Log::Debug(" !!! Master Task NextStatus ERROR -- NULL object from data queue!!! ");
00210 }
00211 }
00212 else
00213 {
00214
00215 stat=0;
00216 }
00217 return stat;
00218 }
00219
00220
00221 TObject * TGo4Task::NextObject(Bool_t wait)
00222 {
00223 if(!IsMaster()) return 0;
00224 TObject* obj=0;
00225 TGo4BufferQueue* dataqueue=dynamic_cast<TGo4BufferQueue*> (GetDataQueue());
00226 if(dataqueue)
00227 {
00228 if(!wait && dataqueue->IsEmpty())
00229 return 0;
00230 obj=dataqueue->WaitObjectFromBuffer();
00231
00232 }
00233 else
00234 {
00235
00236 obj=0;
00237 }
00238 return obj;
00239 }
00240 void TGo4Task::AddUserCommand(TGo4Command* com)
00241 {
00242
00243 fxCommandPrototype->AddCommand(com);
00244 }
00245 void TGo4Task::AddUserCommandList(TGo4CommandProtoList * comlist)
00246 {
00247 if(comlist)
00248 {
00249 *fxCommandPrototype += *comlist;
00250
00251 delete comlist;
00252 comlist=0;
00253 }
00254 }
00255
00256 void TGo4Task::SendObject(TObject * obj, const char* receiver)
00257 {
00258 if(IsMaster()) return;
00259 if(obj)
00260 {
00261
00262 TGo4BufferQueue * dataq=GetDataQueue(receiver);
00263 if(dataq)
00264 {
00265
00266
00267 dataq->AddBufferFromObject(obj);
00268 }
00269 else
00270 {
00271 TGo4Log::Debug(" !!! Task - ERROR sending object - no data queue !!! ");
00272 }
00273 }
00274 else
00275 {
00276
00277 SendStatusMessage(2, kTRUE, "Task - object not found");
00278 }
00279 }
00280
00281 void TGo4Task::SendStatus(TGo4Status * stat, const char* receiver)
00282 {
00283 if(IsMaster()) return ;
00284 if(stat)
00285 {
00286
00287 TGo4BufferQueue * statq=GetStatusQueue(receiver);
00288 if(statq)
00289 {
00290 TGo4Log::Debug(" Task - sending status %s ", stat->ClassName());
00291 statq->AddBufferFromObject(stat);
00292 }
00293 else
00294 {
00295 TGo4Log::Debug(" !!! Task - ERROR sending status: no status queue !!! ");
00296 }
00297 }
00298 else
00299 {
00300
00301
00302 }
00303
00304
00305 }
00306 void TGo4Task::SendStatusBuffer()
00307 {
00308 if(IsMaster()) return;
00309 TGo4LockGuard statguard(fxStatusMutex);
00310 TGo4Log::Debug(" Task - sending status buffer ");
00311 TGo4BufferQueue * statq=GetStatusQueue();
00312 if(statq) statq->AddBuffer(fxStatusBuffer,kTRUE);
00313 }
00314 void TGo4Task::SendStatusMessage(Int_t level, Bool_t printout, const char* text,...)
00315 {
00316 if(IsMaster()) return;
00317 Int_t lbuflen=256;
00318
00319 char txtbuf[256];
00320 va_list args;
00321 va_start(args, text);
00322 vsnprintf(txtbuf, lbuflen, text, args);
00323 va_end(args);
00324
00325 const char* dest;
00326 char* curs=txtbuf;
00327 TString receiver=txtbuf;
00328 Ssiz_t pos=receiver.Index("::",2,0,TString::kExact);
00329 if(pos!=kNPOS)
00330 {
00331
00332 receiver.Resize(pos);
00333 dest=receiver.Data();
00334 curs += ((size_t) pos);
00335 curs +=2;
00336 }
00337 else
00338 {
00339 dest=0;
00340 }
00341
00342 Bool_t previousmode=TGo4Log::IsOutputEnabled();
00343 TGo4Log::OutputEnable(printout);
00344 const char* go4mess=TGo4Log::Message(level,curs);
00345 TGo4Log::OutputEnable(previousmode);
00346 if(level>0 && go4mess!=0)
00347 {
00348
00349 TGo4Status* message= new TGo4Status(go4mess);
00350 SendStatus(message,dest);
00351 delete message;
00352 }
00353 }
00354 void TGo4Task::UpdateStatusBuffer()
00355 {
00356 if(IsMaster()) return;
00357 TGo4LockGuard statguard(fxStatusMutex);
00358 TGo4LockGuard main;
00359 TFile *filsav = gFile;
00360 gFile = 0;
00361 TGo4TaskStatus* state=0;
00362 if(fxSlave)
00363 state=fxSlave->CreateStatus();
00364 else
00365 state=CreateStatus();
00366 fxStatusBuffer->Reset();
00367 fxStatusBuffer->InitMap();
00368 fxStatusBuffer->WriteObject(state);
00369 gFile = filsav;
00370 delete state;
00371 }
00372 TGo4Command* TGo4Task::NextCommand()
00373 {
00374 if(IsMaster()) return 0;
00375 TGo4Command* com=0;
00376 TObject* obj=0;
00377 TGo4BufferQueue * comq=GetCommandQueue();
00378 if(comq==0) return 0;
00379 if(!comq->IsEmpty() || (fxSlave!=0 && !fxSlave->MainIsRunning() ) )
00380 {
00381
00382
00383 obj=comq->WaitObjectFromBuffer();
00384 if(obj)
00385 {
00386 if(obj->InheritsFrom(TGo4Command::Class()))
00387 {
00388 com= dynamic_cast<TGo4Command*>(obj);
00389 com->SetTaskName("current");
00390 com->SetMode(kGo4ComModeController);
00391 }
00392 else
00393 {
00394 delete obj;
00395 }
00396 }
00397 else
00398 {
00399
00400 }
00401 }
00402 else
00403 {
00404 com=0;
00405 }
00406 return com;
00407 }
00408 Int_t TGo4Task::Initialization()
00409 {
00410
00411 Int_t rev=-1;
00412 if(fbInitDone)
00413
00414 {
00415 rev=0;
00416 }
00417 else
00418 {
00419 if(fxCommandPrototype==0)
00420 {
00421 if(fxMaster)
00422 {
00423 fxCommandPrototype=fxMaster->CreateCommandList();
00424 TGo4Log::Debug(" Task -- command list is created from Master factory");
00425 }
00426 else
00427 {
00428 fxCommandPrototype=CreateCommandList();
00429 TGo4Log::Debug(" Task -- command list is created from Task factory");
00430 }
00431 }
00432 rev=TGo4ThreadManager::Initialization();
00433 fxWorkHandler->Start(GetCommanderName());
00434 if(fxSlave) fxSlave->Initialization();
00435 }
00436 return rev;
00437 }
00438
00439 void TGo4Task::UpdateStatus(TGo4TaskStatus* state)
00440 {
00441 TGo4TaskHandlerStatus* taskhandlerstatus=0;
00442 TGo4TaskHandler* th=GetTaskHandler();
00443 if(th) taskhandlerstatus=th->CreateStatus();
00444 state->SetTaskHandlerStatus(taskhandlerstatus);
00445 state->SetFlags(fbAppBlocking, fbAutoCreate, fbAutoStart, fbTerminating, fbInitDone);
00446 }
00447
00448 TGo4TaskStatus* TGo4Task::CreateStatus()
00449 {
00450 TGo4TaskStatus* stat = new TGo4TaskStatus(GetName());
00451 UpdateStatus(stat);
00452 return stat;
00453 }
00454
00455
00456 Bool_t TGo4Task::SubmitCommand(const char* name)
00457 {
00458 if(!strcmp(name,"THEMQuit"))
00459 {
00460 return (SubmitEmergencyCommand(kComQuit));
00461 }
00462 else if (!strcmp(name,"THEMKill"))
00463 {
00464 return (SubmitEmergencyCommand(kComKillMain));
00465 }
00466 else if (!strcmp(name,"THEMRestart"))
00467 {
00468 return (SubmitEmergencyCommand(kComRestartMain));
00469 }
00470 else
00471 {
00472
00473 TGo4Command* com=MakeCommand(name);
00474 if(com==0)
00475 {
00476
00477 TGo4LockGuard mainlock;
00478 com= new TGo4RemoteCommand(name);
00479
00480 }
00481 return (SubmitCommand(com)) ;
00482 }
00483 }
00484 Bool_t TGo4Task::SubmitEmergencyCommand(Go4EmergencyCommand_t val)
00485 {
00486 TGo4BufferQueue* queue=GetCommandQueue();
00487 if(queue!=0)
00488 {
00489
00490 if(val==kComQuit)
00491 {
00492
00493 queue->AddBuffer(fxQuitBuffer,kTRUE);
00494 }
00495 else
00496 {
00497 TBuffer* commandbuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) val);
00498 queue->AddBuffer(commandbuffer);
00499 }
00500
00501 return kTRUE;
00502 }
00503 return kFALSE;
00504 }
00505
00506 Bool_t TGo4Task::SubmitEmergencyData(Go4EmergencyCommand_t val, const char* receiver)
00507 {
00508 TGo4BufferQueue* queue=GetDataQueue(receiver);
00509 if(queue!=0)
00510 {
00511
00512 if(val==kComQuit)
00513 {
00514
00515 queue->AddBuffer(fxQuitBuffer,kTRUE);
00516 }
00517 else
00518 {
00519 TBuffer* commandbuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) val);
00520 queue->AddBuffer(commandbuffer);
00521
00522 }
00523 return kTRUE;
00524 }
00525 return kFALSE;
00526 }
00527
00528 Bool_t TGo4Task::SubmitCommand(TGo4Command* com)
00529 {
00530 if (com==0) return kFALSE;
00531
00532 Bool_t rev=kTRUE;
00533 if(com->IsLocal())
00534 SubmitLocalCommand(com);
00535 else {
00536
00537 TGo4BufferQueue* queue=GetCommandQueue();
00538 if(queue!=0) {
00539
00540 TGo4LockGuard mainlock;
00541
00542 queue->AddBufferFromObject(com);
00543 } else
00544 rev = kFALSE;
00545 delete com;
00546 }
00547 return rev;
00548 }
00549
00550 TGo4TaskHandlerCommandList* TGo4Task::CreateCommandList()
00551 {
00552 return (new TGo4TaskHandlerCommandList("Go4ServerTaskDefaultCommandList") );
00553 }
00554
00555 TGo4Command* TGo4Task::MakeCommand(const char* name)
00556 {
00557 TGo4LockGuard mainlock;
00558 return ( fxCommandPrototype->MakeCommand(name) );
00559 }
00560
00561 Bool_t TGo4Task::SubmitLocalCommand(TGo4Command* com)
00562 {
00563 if(com==0) return kFALSE;
00564 com->SetMode(kGo4ComModeController);
00565 fxWorkHandler->Start(GetCommanderName());
00566 TGo4ObjectQueue* lqueue=GetLocalCommandQueue();
00567 if(lqueue!=0)
00568 lqueue->AddObject(com);
00569 else {
00570 delete com;
00571 return kFALSE;
00572 }
00573 return kTRUE;
00574 }
00575
00576 void TGo4Task::WakeCommandQueue(Int_t id)
00577 {
00578 if(GetTaskHandler() && GetTaskHandler()->IsAborting())
00579 {
00580
00581 return;
00582 }
00583
00584 TGo4Command* com=new TGo4Command("dummy","this wakes up queue",id);
00585 SubmitCommand(com);
00586 com=new TGo4Command("dummy","this wakes up queue",id);
00587 SubmitLocalCommand(com);
00588
00589
00590 }
00591
00592 void TGo4Task::GetStatus()
00593 {
00594 TGo4Log::Debug(" Task ''%s'' Send Status to Command Master ",GetName());
00595 TGo4BufferQueue* queue = GetStatusQueue();
00596 if(queue==0) return;
00597 {
00598 TGo4LockGuard mainguard;
00599
00600 TGo4TaskStatus* state=CreateStatus();
00601 queue->AddBufferFromObject(state);
00602 }
00603 }
00604
00605 Int_t TGo4Task::StartWorkThreads()
00606 {
00607 fbWorkIsStopped=kFALSE;
00608 if(fxOwner)
00609 return (fxOwner->StartWorkThreads());
00610 else
00611 return 0;
00612 }
00613
00614 Int_t TGo4Task::StopWorkThreads()
00615 {
00616 fbWorkIsStopped=kTRUE;
00617 if(fxOwner)
00618 return(fxOwner->StopWorkThreads());
00619 else
00620 return 0;
00621 }
00622
00623 void TGo4Task::SendStopBuffers(const char* taskname)
00624 {
00625 TGo4TaskHandler* th=GetTaskHandler();
00626 if(th==0) return;
00627 if(th->IsAborting())
00628 {
00629
00630 return;
00631 }
00632
00633 if(IsMaster())
00634 {
00635
00636 TGo4BufferQueue * comq=GetCommandQueue(taskname);
00637 if(comq)
00638 {
00639 comq->AddBuffer(fxStopBuffer,kTRUE);
00640 }
00641 }
00642 else
00643 {
00644
00645 TGo4BufferQueue * dataq=GetDataQueue(taskname);
00646 if(dataq)
00647 {
00648 dataq->AddBuffer(fxStopBuffer,kTRUE);
00649 }
00650 TGo4BufferQueue * statq=GetStatusQueue(taskname);
00651 if(statq)
00652 {
00653 statq->AddBuffer(fxStopBuffer,kTRUE);
00654 }
00655 }
00656 }
00657
00658 void TGo4Task::LockAll()
00659 {
00660 fxStatusMutex->Lock();
00661 TGo4LockGuard::LockMainMutex();
00662
00663 }
00664
00665 void TGo4Task::UnLockAll()
00666 {
00667 TGo4LockGuard::UnLockMainMutex();
00668 fxStatusMutex->UnLock();
00669 }
00670
00671 Int_t TGo4Task::Get_fgiTERMID()
00672 {
00673 return fgiTERMID;
00674 }
00675
00676
00677