00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "TGo4Task.h"
00015
00016 #include "TFile.h"
00017 #include "TMutex.h"
00018 #include "TROOT.h"
00019 #include "snprintf.h"
00020
00021 #include "TGo4Log.h"
00022 #include "TGo4Buffer.h"
00023 #include "TGo4LockGuard.h"
00024 #include "TGo4CommandInvoker.h"
00025 #include "TGo4RemoteCommand.h"
00026 #include "TGo4LocalCommandRunnable.h"
00027 #include "TGo4ObjectQueue.h"
00028 #include "TGo4BufferQueue.h"
00029 #include "TGo4ThreadHandler.h"
00030 #include "TGo4TaskStatus.h"
00031 #include "TGo4TaskHandler.h"
00032 #include "TGo4TaskHandlerCommandList.h"
00033 #include "TGo4Master.h"
00034 #include "TGo4Slave.h"
00035
00036 const Int_t TGo4Task::fgiTERMID=999;
00037
00038 TGo4Task::TGo4Task(const char* name, Bool_t blockingmode,
00039 Bool_t autostart,
00040 Bool_t autocreate,
00041 Bool_t ismaster) :
00042 TGo4ThreadManager(name,blockingmode, autostart,autocreate),
00043 fbCommandMaster(ismaster), fxMaster(0), fxSlave(0),fxOwner(0),
00044 fbWorkIsStopped(kFALSE),fxStopBuffer(0),fxQuitBuffer(0)
00045 {
00046 fxCommandPrototype=0;
00047 fxStatusBuffer= new TGo4Buffer(TBuffer::kWrite);
00048 fxStatusMutex= new TMutex(kTRUE);
00049 fxStopBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComCloseInput);
00050 fxQuitBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComQuit);
00051 fxAbortBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComAbortTask);
00052
00053 TGo4CommandInvoker::Instance();
00054 TGo4CommandInvoker::SetCommandList(CreateCommandList());
00055
00056 TGo4CommandInvoker::Register("NoReceiver", this);
00057 TGo4CommandInvoker::Register("Task",this);
00058
00059
00060
00061 fxLocalCommandQueue = new TGo4ObjectQueue("localcommands");
00062
00063
00064 TString nomen("LocalCommandRunnable of "); nomen+=name;
00065 TGo4LocalCommandRunnable* commander = new TGo4LocalCommandRunnable(nomen.Data(), this);
00066
00067 fxCommanderName = "COMMANDER-"; fxCommanderName += name;
00068 GetWorkHandler()->NewThread(GetCommanderName(), commander);
00069 }
00070
00071 TGo4Task::~TGo4Task()
00072 {
00073 if(fxOwner) {
00074 fxOwner->SetTask(0,kFALSE);
00075 delete fxOwner;
00076 }
00077 delete fxLocalCommandQueue;
00078 delete fxCommandPrototype;
00079 delete fxQuitBuffer;
00080 delete fxStopBuffer;
00081 delete fxStatusBuffer;
00082 delete fxStatusMutex;
00083 TGo4CommandInvoker::UnRegister(this);
00084 }
00085
00086 void TGo4Task::SetMaster(TGo4Master* m)
00087 {
00088 if(m!=0) SetMaster(kTRUE);
00089 fxMaster=m;
00090 }
00091
00092 void TGo4Task::SetSlave(TGo4Slave* s)
00093 {
00094 if(s!=0) SetMaster(kFALSE);
00095 fxSlave=s;
00096 }
00097
00098 void TGo4Task::SetOwner(TGo4TaskOwner* owner)
00099 {
00100 fxOwner=owner;
00101 SetMaster(dynamic_cast<TGo4Master*>(owner));
00102 SetSlave(dynamic_cast<TGo4Slave*>(owner));
00103 }
00104
00105 void TGo4Task::Start()
00106 {
00107 if(fxSlave) fxSlave->Start();
00108 }
00109
00110 void TGo4Task::Stop()
00111 {
00112 if(fxSlave) fxSlave->Stop();
00113 }
00114
00115 void TGo4Task::Quit()
00116 {
00117 if(fxSlave) fxSlave->Quit();
00118 }
00119
00120 void TGo4Task::KillMain()
00121 {
00122 if(fxSlave) fxSlave->KillMain();
00123
00124 }
00125 void TGo4Task::RestartMain()
00126 {
00127 if(fxSlave) fxSlave->RestartMain();
00128 }
00129
00130 void TGo4Task::Terminate (Bool_t termapp)
00131 {
00132 if(fxSlave)
00133 fxSlave->Terminate(termapp);
00134 else
00135 TGo4ThreadManager::Terminate(termapp);
00136 }
00137
00138 void TGo4Task::TerminateFast()
00139 {
00140 if(fxSlave)
00141 fxSlave->TerminateFast();
00142 else
00143 TGo4ThreadManager::TerminateFast();
00144 }
00145
00146 void TGo4Task::ExecuteString(const char* command)
00147 {
00148 if(fxSlave)
00149 fxSlave->ExecuteString(command);
00150 else
00151 gROOT->ProcessLineSync(command);
00152 }
00153
00154 TGo4TaskHandler* TGo4Task::GetTaskHandler()
00155 {
00156 return 0;
00157 }
00158
00159 TGo4BufferQueue* TGo4Task::GetCommandQueue(const char*)
00160 {
00161 return 0;
00162 }
00163
00164 TGo4BufferQueue * TGo4Task::GetStatusQueue(const char*)
00165 {
00166 return 0;
00167 }
00168 TGo4BufferQueue * TGo4Task::GetDataQueue(const char*)
00169 {
00170 return 0;
00171 }
00172
00173 TGo4TaskHandlerCommandList * TGo4Task::GetPrototype()
00174 {
00175
00176 return fxCommandPrototype;
00177 }
00178
00179 TGo4Status * TGo4Task::NextStatus(Bool_t wait)
00180 {
00181 if(!IsMaster()) return 0;
00182 TObject* obj=0;
00183 TGo4Status* stat=0;
00184 TGo4BufferQueue* statqueue = dynamic_cast<TGo4BufferQueue*> (GetStatusQueue());
00185 if(statqueue)
00186 {
00187
00188 if(!wait && statqueue->IsEmpty()) return 0;
00189 obj = statqueue->WaitObjectFromBuffer();
00190 if(obj)
00191 {
00192 if(obj->InheritsFrom(TGo4Status::Class()))
00193 {
00194 stat= dynamic_cast<TGo4Status*>(obj);
00195 }
00196 else
00197 {
00198 TGo4Log::Debug(" !!! Master Task: NextStatus ERROR, unknown object %s from status queue!!! ",
00199 obj->GetName());
00200 delete obj;
00201 }
00202 }
00203 else
00204 {
00205 TGo4Log::Debug(" !!! Master Task NextStatus ERROR -- NULL object from data queue!!! ");
00206 }
00207 }
00208 else
00209 {
00210
00211 stat=0;
00212 }
00213 return stat;
00214 }
00215
00216
00217 TObject * TGo4Task::NextObject(Bool_t wait)
00218 {
00219 if(!IsMaster()) return 0;
00220 TObject* obj=0;
00221 TGo4BufferQueue* dataqueue=dynamic_cast<TGo4BufferQueue*> (GetDataQueue());
00222 if(dataqueue)
00223 {
00224 if(!wait && dataqueue->IsEmpty())
00225 return 0;
00226 obj=dataqueue->WaitObjectFromBuffer();
00227
00228 }
00229 else
00230 {
00231
00232 obj=0;
00233 }
00234 return obj;
00235 }
00236
00237 void TGo4Task::AddUserCommand(TGo4Command* com)
00238 {
00239 fxCommandPrototype->AddCommand(com);
00240 }
00241
00242 void TGo4Task::AddUserCommandList(TGo4CommandProtoList * comlist)
00243 {
00244 if(comlist)
00245 {
00246 *fxCommandPrototype += *comlist;
00247
00248 delete comlist;
00249 comlist=0;
00250 }
00251 }
00252
00253 void TGo4Task::SendObject(TObject * obj, const char* receiver)
00254 {
00255 if(IsMaster()) return;
00256 if(obj) {
00257
00258 TGo4BufferQueue * dataq=GetDataQueue(receiver);
00259 if(dataq)
00260 {
00261
00262
00263 dataq->AddBufferFromObject(obj);
00264 }
00265 else
00266 {
00267 TGo4Log::Debug(" !!! Task - ERROR sending object - no data queue !!! ");
00268 }
00269 }
00270 else
00271 {
00272
00273 SendStatusMessage(2, kTRUE, "Task - object not found");
00274 }
00275 }
00276
00277 void TGo4Task::SendStatus(TGo4Status * stat, const char* receiver)
00278 {
00279 if(IsMaster()) return;
00280 if(stat) {
00281
00282 TGo4BufferQueue * statq=GetStatusQueue(receiver);
00283 if(statq) {
00284 TGo4Log::Debug(" Task - sending status %s ", stat->ClassName());
00285 statq->AddBufferFromObject(stat);
00286 } else {
00287 TGo4Log::Debug(" !!! Task - ERROR sending status: no status queue !!! ");
00288 }
00289 } else {
00290
00291 }
00292 }
00293
00294 void TGo4Task::SendStatusBuffer()
00295 {
00296 if(IsMaster()) return;
00297 TGo4LockGuard statguard(fxStatusMutex);
00298 TGo4Log::Debug(" Task - sending status buffer ");
00299 TGo4BufferQueue * statq=GetStatusQueue();
00300 if(statq) statq->AddBuffer(fxStatusBuffer,kTRUE);
00301 }
00302
00303 void TGo4Task::SendStatusMessage(Int_t level, Bool_t printout, const char* text,...)
00304 {
00305 if(IsMaster()) return;
00306 Int_t lbuflen=256;
00307
00308 char txtbuf[256];
00309 va_list args;
00310 va_start(args, text);
00311 vsnprintf(txtbuf, lbuflen, text, args);
00312 va_end(args);
00313
00314 const char* dest;
00315 char* curs=txtbuf;
00316 TString receiver=txtbuf;
00317 Ssiz_t pos=receiver.Index("::",2,0,TString::kExact);
00318 if(pos!=kNPOS) {
00319
00320 receiver.Resize(pos);
00321 dest=receiver.Data();
00322 curs += ((size_t) pos);
00323 curs +=2;
00324 } else {
00325 dest = 0;
00326 }
00327
00328 Bool_t previousmode = TGo4Log::IsOutputEnabled();
00329 TGo4Log::OutputEnable(printout);
00330 const char* go4mess = TGo4Log::Message(level, curs);
00331 TGo4Log::OutputEnable(previousmode);
00332 if((level>0) && (go4mess!=0)) {
00333
00334 TGo4Status* message = new TGo4Status(go4mess);
00335 SendStatus(message, dest);
00336 delete message;
00337 }
00338 }
00339
00340 void TGo4Task::UpdateStatusBuffer()
00341 {
00342 if(IsMaster()) return;
00343 TGo4LockGuard statguard(fxStatusMutex);
00344 TGo4LockGuard main;
00345 TFile *filsav = gFile;
00346 gFile = 0;
00347
00348 TGo4TaskStatus* state = fxSlave ? fxSlave->CreateStatus() : CreateStatus();
00349
00350 fxStatusBuffer->Reset();
00351 fxStatusBuffer->InitMap();
00352 fxStatusBuffer->WriteObject(state);
00353 gFile = filsav;
00354 delete state;
00355 }
00356
00357 TGo4Command* TGo4Task::NextCommand()
00358 {
00359 if(IsMaster()) return 0;
00360 TGo4Command* com=0;
00361 TGo4BufferQueue * comq = GetCommandQueue();
00362 if(comq==0) return 0;
00363 if(!comq->IsEmpty() || (fxSlave!=0 && !fxSlave->MainIsRunning() ) )
00364 {
00365
00366
00367 TObject* obj = comq->WaitObjectFromBuffer();
00368 if(obj)
00369 {
00370 if(obj->InheritsFrom(TGo4Command::Class()))
00371 {
00372 com = dynamic_cast<TGo4Command*>(obj);
00373 com->SetTaskName("current");
00374 com->SetMode(kGo4ComModeController);
00375 }
00376 else
00377 {
00378 delete obj;
00379 }
00380 }
00381 else
00382 {
00383
00384 }
00385 }
00386 else
00387 {
00388 com=0;
00389 }
00390 return com;
00391 }
00392
00393 Int_t TGo4Task::Initialization()
00394 {
00395
00396 Int_t rev=-1;
00397 if(fbInitDone)
00398
00399 {
00400 rev=0;
00401 }
00402 else
00403 {
00404 if(fxCommandPrototype==0)
00405 {
00406 if(fxMaster)
00407 {
00408 fxCommandPrototype=fxMaster->CreateCommandList();
00409 TGo4Log::Debug(" Task -- command list is created from Master factory");
00410 }
00411 else
00412 {
00413 fxCommandPrototype=CreateCommandList();
00414 TGo4Log::Debug(" Task -- command list is created from Task factory");
00415 }
00416 }
00417 rev=TGo4ThreadManager::Initialization();
00418 fxWorkHandler->Start(GetCommanderName());
00419 if(fxSlave) fxSlave->Initialization();
00420 }
00421 return rev;
00422 }
00423
00424 void TGo4Task::UpdateStatus(TGo4TaskStatus* state)
00425 {
00426 TGo4TaskHandlerStatus* taskhandlerstatus=0;
00427 TGo4TaskHandler* th=GetTaskHandler();
00428 if(th) taskhandlerstatus=th->CreateStatus();
00429 state->SetTaskHandlerStatus(taskhandlerstatus);
00430 state->SetFlags(fbAppBlocking, fbAutoCreate, fbAutoStart, fbTerminating, fbInitDone);
00431 }
00432
00433 TGo4TaskStatus* TGo4Task::CreateStatus()
00434 {
00435 TGo4TaskStatus* stat = new TGo4TaskStatus(GetName());
00436 UpdateStatus(stat);
00437 return stat;
00438 }
00439
00440 Bool_t TGo4Task::SubmitCommand(const char* name)
00441 {
00442 if(!strcmp(name,"THEMQuit"))
00443 {
00444 return (SubmitEmergencyCommand(kComQuit));
00445 }
00446 else if (!strcmp(name,"THEMKill"))
00447 {
00448 return (SubmitEmergencyCommand(kComKillMain));
00449 }
00450 else if (!strcmp(name,"THEMRestart"))
00451 {
00452 return (SubmitEmergencyCommand(kComRestartMain));
00453 }
00454 else
00455 {
00456
00457 TGo4Command* com = MakeCommand(name);
00458 if(com==0)
00459 {
00460
00461 TGo4LockGuard mainlock;
00462 com = new TGo4RemoteCommand(name);
00463
00464 }
00465 return (SubmitCommand(com)) ;
00466 }
00467 }
00468
00469 Bool_t TGo4Task::SubmitEmergencyCommand(Go4EmergencyCommand_t val)
00470 {
00471 TGo4BufferQueue* queue = GetCommandQueue();
00472 if(queue!=0)
00473 {
00474
00475 if(val==kComQuit)
00476 {
00477
00478 queue->AddBuffer(fxQuitBuffer,kTRUE);
00479 }
00480 else
00481 {
00482 TBuffer* commandbuffer = TGo4BufferQueue::CreateValueBuffer((UInt_t) val);
00483 queue->AddBuffer(commandbuffer);
00484 }
00485
00486 return kTRUE;
00487 }
00488 return kFALSE;
00489 }
00490
00491 Bool_t TGo4Task::SubmitEmergencyData(Go4EmergencyCommand_t val, const char* receiver)
00492 {
00493 TGo4BufferQueue* queue=GetDataQueue(receiver);
00494 if(queue!=0)
00495 {
00496
00497 if(val==kComQuit)
00498 {
00499
00500 queue->AddBuffer(fxQuitBuffer,kTRUE);
00501 }
00502 else
00503 {
00504 TBuffer* commandbuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) val);
00505 queue->AddBuffer(commandbuffer);
00506
00507 }
00508 return kTRUE;
00509 }
00510 return kFALSE;
00511 }
00512
00513 Bool_t TGo4Task::SubmitCommand(TGo4Command* com)
00514 {
00515 if (com==0) return kFALSE;
00516
00517 Bool_t rev=kTRUE;
00518 if(com->IsLocal())
00519 SubmitLocalCommand(com);
00520 else {
00521
00522 TGo4BufferQueue* queue=GetCommandQueue();
00523 if(queue!=0) {
00524
00525 TGo4LockGuard mainlock;
00526
00527 queue->AddBufferFromObject(com);
00528 } else
00529 rev = kFALSE;
00530 delete com;
00531 }
00532 return rev;
00533 }
00534
00535 TGo4TaskHandlerCommandList* TGo4Task::CreateCommandList()
00536 {
00537 return (new TGo4TaskHandlerCommandList("Go4ServerTaskDefaultCommandList") );
00538 }
00539
00540 TGo4Command* TGo4Task::MakeCommand(const char* name)
00541 {
00542 TGo4LockGuard mainlock;
00543 return ( fxCommandPrototype->MakeCommand(name) );
00544 }
00545
00546 Bool_t TGo4Task::SubmitLocalCommand(TGo4Command* com)
00547 {
00548 if(com==0) return kFALSE;
00549 com->SetMode(kGo4ComModeAdministrator);
00550 fxWorkHandler->Start(GetCommanderName());
00551 TGo4ObjectQueue* lqueue = GetLocalCommandQueue();
00552 if(lqueue==0) {
00553 delete com;
00554 return kFALSE;
00555 }
00556 lqueue->AddObject(com);
00557 return kTRUE;
00558 }
00559
00560 void TGo4Task::WakeCommandQueue(Int_t id)
00561 {
00562 if(GetTaskHandler() && GetTaskHandler()->IsAborting())
00563 {
00564
00565 return;
00566 }
00567
00568 TGo4Command* com=new TGo4Command("dummy","this wakes up queue",id);
00569 SubmitCommand(com);
00570 com=new TGo4Command("dummy","this wakes up queue",id);
00571 SubmitLocalCommand(com);
00572
00573
00574 }
00575
00576 void TGo4Task::GetStatus()
00577 {
00578 TGo4Log::Debug(" Task ''%s'' Send Status to Command Master ",GetName());
00579 TGo4BufferQueue* queue = GetStatusQueue();
00580 if(queue==0) return;
00581 {
00582 TGo4LockGuard mainguard;
00583
00584 TGo4TaskStatus* state=CreateStatus();
00585 queue->AddBufferFromObject(state);
00586 }
00587 }
00588
00589 Int_t TGo4Task::StartWorkThreads()
00590 {
00591 fbWorkIsStopped = kFALSE;
00592 return fxOwner ? fxOwner->StartWorkThreads() : 0;
00593 }
00594
00595 Int_t TGo4Task::StopWorkThreads()
00596 {
00597 fbWorkIsStopped=kTRUE;
00598 return fxOwner ? fxOwner->StopWorkThreads() : 0;
00599 }
00600
00601 void TGo4Task::SendStopBuffers(const char* taskname)
00602 {
00603 TGo4TaskHandler* th = GetTaskHandler();
00604 if(th==0) return;
00605 if(th->IsAborting()) {
00606
00607 return;
00608 }
00609
00610 if(IsMaster())
00611 {
00612
00613 TGo4BufferQueue * comq=GetCommandQueue(taskname);
00614 if(comq)
00615 {
00616 comq->AddBuffer(fxStopBuffer,kTRUE);
00617 }
00618 }
00619 else
00620 {
00621
00622 TGo4BufferQueue * dataq=GetDataQueue(taskname);
00623 if(dataq)
00624 {
00625 dataq->AddBuffer(fxStopBuffer,kTRUE);
00626 }
00627 TGo4BufferQueue * statq=GetStatusQueue(taskname);
00628 if(statq)
00629 {
00630 statq->AddBuffer(fxStopBuffer,kTRUE);
00631 }
00632 }
00633 }
00634
00635 void TGo4Task::LockAll()
00636 {
00637 fxStatusMutex->Lock();
00638 TGo4LockGuard::LockMainMutex();
00639
00640 }
00641
00642 void TGo4Task::UnLockAll()
00643 {
00644 TGo4LockGuard::UnLockMainMutex();
00645 fxStatusMutex->UnLock();
00646 }
00647
00648 Int_t TGo4Task::Get_fgiTERMID()
00649 {
00650 return fgiTERMID;
00651 }
00652