00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "TGo4Task.h"
00015
00016 #include "TFile.h"
00017 #include "TMutex.h"
00018 #include "TROOT.h"
00019 #include "snprintf.h"
00020
00021 #include "TGo4Log.h"
00022 #include "TGo4Buffer.h"
00023 #include "TGo4LockGuard.h"
00024 #include "TGo4CommandInvoker.h"
00025 #include "TGo4RemoteCommand.h"
00026 #include "TGo4LocalCommandRunnable.h"
00027 #include "TGo4ObjectQueue.h"
00028 #include "TGo4BufferQueue.h"
00029 #include "TGo4ThreadHandler.h"
00030 #include "TGo4TaskStatus.h"
00031 #include "TGo4TaskHandler.h"
00032 #include "TGo4TaskHandlerCommandList.h"
00033 #include "TGo4Master.h"
00034 #include "TGo4Slave.h"
00035
00036 const Int_t TGo4Task::fgiTERMID=999;
00037
00038 TGo4Task::TGo4Task(const char* name, Bool_t blockingmode,
00039 Bool_t autostart,
00040 Bool_t autocreate,
00041 Bool_t ismaster)
00042 : TGo4ThreadManager(name,blockingmode, autostart,autocreate),
00043 fbCommandMaster(ismaster), fxMaster(0), fxSlave(0),fxOwner(0),
00044 fbWorkIsStopped(kFALSE),fxStopBuffer(0),fxQuitBuffer(0)
00045 {
00046 fxCommandPrototype=0;
00047 fxStatusBuffer= new TGo4Buffer(TBuffer::kWrite);
00048 fxStatusMutex= new TMutex(kTRUE);
00049 fxStopBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComCloseInput);
00050 fxQuitBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComQuit);
00051 fxAbortBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComAbortTask);
00052
00053 TGo4CommandInvoker::Instance();
00054 TGo4CommandInvoker::SetCommandList(CreateCommandList());
00055
00056 TGo4CommandInvoker::Register("NoReceiver", this);
00057 TGo4CommandInvoker::Register("Task",this);
00058
00059
00060
00061 fxLocalCommandQueue = new TGo4ObjectQueue("localcommands");
00062
00063
00064 TString nomen("LocalCommandRunnable of "); nomen+=name;
00065 TGo4LocalCommandRunnable* commander = new TGo4LocalCommandRunnable(nomen.Data(), this);
00066
00067 fxCommanderName = "COMMANDER-"; fxCommanderName += name;
00068 GetWorkHandler()->NewThread(GetCommanderName(), commander);
00069 }
00070
00071 TGo4Task::~TGo4Task()
00072 {
00073 if(fxOwner)
00074 {
00075 fxOwner->SetTask(0,kFALSE);
00076 delete fxOwner;
00077 }
00078 delete fxLocalCommandQueue;
00079 delete fxCommandPrototype;
00080 delete fxQuitBuffer;
00081 delete fxStopBuffer;
00082 delete fxStatusBuffer;
00083 delete fxStatusMutex;
00084 TGo4CommandInvoker::UnRegister(this);
00085 }
00086
00087 void TGo4Task::SetMaster(TGo4Master* m)
00088 {
00089 if(m!=0) SetMaster(kTRUE);
00090 fxMaster=m;
00091 }
00092
00093 void TGo4Task::SetSlave(TGo4Slave* s)
00094 {
00095 if(s!=0) SetMaster(kFALSE);
00096 fxSlave=s;
00097 }
00098
00099 void TGo4Task::SetOwner(TGo4TaskOwner* owner)
00100 {
00101 fxOwner=owner;
00102 SetMaster(dynamic_cast<TGo4Master*>(owner));
00103 SetSlave(dynamic_cast<TGo4Slave*>(owner));
00104 }
00105
00106 void TGo4Task::Start()
00107 {
00108 if(fxSlave) fxSlave->Start();
00109 }
00110
00111 void TGo4Task::Stop()
00112 {
00113 if(fxSlave) fxSlave->Stop();
00114 }
00115
00116 void TGo4Task::Quit()
00117 {
00118 if(fxSlave) fxSlave->Quit();
00119 }
00120
00121 void TGo4Task::KillMain()
00122 {
00123 if(fxSlave) fxSlave->KillMain();
00124
00125 }
00126 void TGo4Task::RestartMain()
00127 {
00128 if(fxSlave) fxSlave->RestartMain();
00129 }
00130
00131 void TGo4Task::Terminate (Bool_t termapp)
00132 {
00133 if(fxSlave)
00134 fxSlave->Terminate(termapp);
00135 else
00136 TGo4ThreadManager::Terminate(termapp);
00137 }
00138
00139 void TGo4Task::TerminateFast()
00140 {
00141 if(fxSlave)
00142 fxSlave->TerminateFast();
00143 else
00144 TGo4ThreadManager::TerminateFast();
00145 }
00146
00147 void TGo4Task::ExecuteString(const char* command)
00148 {
00149 if(fxSlave)
00150 fxSlave->ExecuteString(command);
00151 else
00152 gROOT->ProcessLineSync(command);
00153
00154 }
00155
00156 TGo4TaskHandler* TGo4Task::GetTaskHandler()
00157 {
00158 return 0;
00159 }
00160
00161 TGo4BufferQueue* TGo4Task::GetCommandQueue(const char*)
00162 {
00163 return 0;
00164 }
00165
00166 TGo4BufferQueue * TGo4Task::GetStatusQueue(const char*)
00167 {
00168 return 0;
00169 }
00170 TGo4BufferQueue * TGo4Task::GetDataQueue(const char*)
00171 {
00172 return 0;
00173 }
00174
00175 TGo4TaskHandlerCommandList * TGo4Task::GetPrototype()
00176 {
00177
00178 return fxCommandPrototype;
00179 }
00180
00181 TGo4Status * TGo4Task::NextStatus(Bool_t wait)
00182 {
00183 if(!IsMaster()) return 0;
00184 TObject* obj=0;
00185 TGo4Status* stat=0;
00186 TGo4BufferQueue* statqueue = dynamic_cast<TGo4BufferQueue*> (GetStatusQueue());
00187 if(statqueue)
00188 {
00189 if(!wait && statqueue->IsEmpty())
00190 return 0;
00191 obj=statqueue->WaitObjectFromBuffer();
00192 if(obj)
00193 {
00194 if(obj->InheritsFrom(TGo4Status::Class()))
00195 {
00196 stat= dynamic_cast<TGo4Status*>(obj);
00197 }
00198 else
00199 {
00200 TGo4Log::Debug(" !!! Master Task: NextStatus ERROR, unknown object %s from status queue!!! ",
00201 obj->GetName());
00202 delete obj;
00203 }
00204 }
00205 else
00206 {
00207 TGo4Log::Debug(" !!! Master Task NextStatus ERROR -- NULL object from data queue!!! ");
00208 }
00209 }
00210 else
00211 {
00212
00213 stat=0;
00214 }
00215 return stat;
00216 }
00217
00218
00219 TObject * TGo4Task::NextObject(Bool_t wait)
00220 {
00221 if(!IsMaster()) return 0;
00222 TObject* obj=0;
00223 TGo4BufferQueue* dataqueue=dynamic_cast<TGo4BufferQueue*> (GetDataQueue());
00224 if(dataqueue)
00225 {
00226 if(!wait && dataqueue->IsEmpty())
00227 return 0;
00228 obj=dataqueue->WaitObjectFromBuffer();
00229
00230 }
00231 else
00232 {
00233
00234 obj=0;
00235 }
00236 return obj;
00237 }
00238 void TGo4Task::AddUserCommand(TGo4Command* com)
00239 {
00240
00241 fxCommandPrototype->AddCommand(com);
00242 }
00243 void TGo4Task::AddUserCommandList(TGo4CommandProtoList * comlist)
00244 {
00245 if(comlist)
00246 {
00247 *fxCommandPrototype += *comlist;
00248
00249 delete comlist;
00250 comlist=0;
00251 }
00252 }
00253
00254 void TGo4Task::SendObject(TObject * obj, const char* receiver)
00255 {
00256 if(IsMaster()) return;
00257 if(obj)
00258 {
00259
00260 TGo4BufferQueue * dataq=GetDataQueue(receiver);
00261 if(dataq)
00262 {
00263
00264
00265 dataq->AddBufferFromObject(obj);
00266 }
00267 else
00268 {
00269 TGo4Log::Debug(" !!! Task - ERROR sending object - no data queue !!! ");
00270 }
00271 }
00272 else
00273 {
00274
00275 SendStatusMessage(2, kTRUE, "Task - object not found");
00276 }
00277 }
00278
00279 void TGo4Task::SendStatus(TGo4Status * stat, const char* receiver)
00280 {
00281 if(IsMaster()) return;
00282 if(stat) {
00283
00284 TGo4BufferQueue * statq=GetStatusQueue(receiver);
00285 if(statq) {
00286 TGo4Log::Debug(" Task - sending status %s ", stat->ClassName());
00287 statq->AddBufferFromObject(stat);
00288 } else {
00289 TGo4Log::Debug(" !!! Task - ERROR sending status: no status queue !!! ");
00290 }
00291 } else {
00292
00293 }
00294 }
00295
00296 void TGo4Task::SendStatusBuffer()
00297 {
00298 if(IsMaster()) return;
00299 TGo4LockGuard statguard(fxStatusMutex);
00300 TGo4Log::Debug(" Task - sending status buffer ");
00301 TGo4BufferQueue * statq=GetStatusQueue();
00302 if(statq) statq->AddBuffer(fxStatusBuffer,kTRUE);
00303 }
00304
00305 void TGo4Task::SendStatusMessage(Int_t level, Bool_t printout, const char* text,...)
00306 {
00307 if(IsMaster()) return;
00308 Int_t lbuflen=256;
00309
00310 char txtbuf[256];
00311 va_list args;
00312 va_start(args, text);
00313 vsnprintf(txtbuf, lbuflen, text, args);
00314 va_end(args);
00315
00316 const char* dest;
00317 char* curs=txtbuf;
00318 TString receiver=txtbuf;
00319 Ssiz_t pos=receiver.Index("::",2,0,TString::kExact);
00320 if(pos!=kNPOS)
00321 {
00322
00323 receiver.Resize(pos);
00324 dest=receiver.Data();
00325 curs += ((size_t) pos);
00326 curs +=2;
00327 }
00328 else
00329 {
00330 dest=0;
00331 }
00332
00333 Bool_t previousmode=TGo4Log::IsOutputEnabled();
00334 TGo4Log::OutputEnable(printout);
00335 const char* go4mess=TGo4Log::Message(level,curs);
00336 TGo4Log::OutputEnable(previousmode);
00337 if((level>0) && (go4mess!=0)) {
00338
00339 TGo4Status* message = new TGo4Status(go4mess);
00340 SendStatus(message, dest);
00341 delete message;
00342 }
00343 }
00344 void TGo4Task::UpdateStatusBuffer()
00345 {
00346 if(IsMaster()) return;
00347 TGo4LockGuard statguard(fxStatusMutex);
00348 TGo4LockGuard main;
00349 TFile *filsav = gFile;
00350 gFile = 0;
00351 TGo4TaskStatus* state=0;
00352 if(fxSlave)
00353 state=fxSlave->CreateStatus();
00354 else
00355 state=CreateStatus();
00356 fxStatusBuffer->Reset();
00357 fxStatusBuffer->InitMap();
00358 fxStatusBuffer->WriteObject(state);
00359 gFile = filsav;
00360 delete state;
00361 }
00362 TGo4Command* TGo4Task::NextCommand()
00363 {
00364 if(IsMaster()) return 0;
00365 TGo4Command* com=0;
00366 TObject* obj=0;
00367 TGo4BufferQueue * comq=GetCommandQueue();
00368 if(comq==0) return 0;
00369 if(!comq->IsEmpty() || (fxSlave!=0 && !fxSlave->MainIsRunning() ) )
00370 {
00371
00372
00373 obj=comq->WaitObjectFromBuffer();
00374 if(obj)
00375 {
00376 if(obj->InheritsFrom(TGo4Command::Class()))
00377 {
00378 com= dynamic_cast<TGo4Command*>(obj);
00379 com->SetTaskName("current");
00380 com->SetMode(kGo4ComModeController);
00381 }
00382 else
00383 {
00384 delete obj;
00385 }
00386 }
00387 else
00388 {
00389
00390 }
00391 }
00392 else
00393 {
00394 com=0;
00395 }
00396 return com;
00397 }
00398 Int_t TGo4Task::Initialization()
00399 {
00400
00401 Int_t rev=-1;
00402 if(fbInitDone)
00403
00404 {
00405 rev=0;
00406 }
00407 else
00408 {
00409 if(fxCommandPrototype==0)
00410 {
00411 if(fxMaster)
00412 {
00413 fxCommandPrototype=fxMaster->CreateCommandList();
00414 TGo4Log::Debug(" Task -- command list is created from Master factory");
00415 }
00416 else
00417 {
00418 fxCommandPrototype=CreateCommandList();
00419 TGo4Log::Debug(" Task -- command list is created from Task factory");
00420 }
00421 }
00422 rev=TGo4ThreadManager::Initialization();
00423 fxWorkHandler->Start(GetCommanderName());
00424 if(fxSlave) fxSlave->Initialization();
00425 }
00426 return rev;
00427 }
00428
00429 void TGo4Task::UpdateStatus(TGo4TaskStatus* state)
00430 {
00431 TGo4TaskHandlerStatus* taskhandlerstatus=0;
00432 TGo4TaskHandler* th=GetTaskHandler();
00433 if(th) taskhandlerstatus=th->CreateStatus();
00434 state->SetTaskHandlerStatus(taskhandlerstatus);
00435 state->SetFlags(fbAppBlocking, fbAutoCreate, fbAutoStart, fbTerminating, fbInitDone);
00436 }
00437
00438 TGo4TaskStatus* TGo4Task::CreateStatus()
00439 {
00440 TGo4TaskStatus* stat = new TGo4TaskStatus(GetName());
00441 UpdateStatus(stat);
00442 return stat;
00443 }
00444
00445
00446 Bool_t TGo4Task::SubmitCommand(const char* name)
00447 {
00448 if(!strcmp(name,"THEMQuit"))
00449 {
00450 return (SubmitEmergencyCommand(kComQuit));
00451 }
00452 else if (!strcmp(name,"THEMKill"))
00453 {
00454 return (SubmitEmergencyCommand(kComKillMain));
00455 }
00456 else if (!strcmp(name,"THEMRestart"))
00457 {
00458 return (SubmitEmergencyCommand(kComRestartMain));
00459 }
00460 else
00461 {
00462
00463 TGo4Command* com=MakeCommand(name);
00464 if(com==0)
00465 {
00466
00467 TGo4LockGuard mainlock;
00468 com= new TGo4RemoteCommand(name);
00469
00470 }
00471 return (SubmitCommand(com)) ;
00472 }
00473 }
00474 Bool_t TGo4Task::SubmitEmergencyCommand(Go4EmergencyCommand_t val)
00475 {
00476 TGo4BufferQueue* queue=GetCommandQueue();
00477 if(queue!=0)
00478 {
00479
00480 if(val==kComQuit)
00481 {
00482
00483 queue->AddBuffer(fxQuitBuffer,kTRUE);
00484 }
00485 else
00486 {
00487 TBuffer* commandbuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) val);
00488 queue->AddBuffer(commandbuffer);
00489 }
00490
00491 return kTRUE;
00492 }
00493 return kFALSE;
00494 }
00495
00496 Bool_t TGo4Task::SubmitEmergencyData(Go4EmergencyCommand_t val, const char* receiver)
00497 {
00498 TGo4BufferQueue* queue=GetDataQueue(receiver);
00499 if(queue!=0)
00500 {
00501
00502 if(val==kComQuit)
00503 {
00504
00505 queue->AddBuffer(fxQuitBuffer,kTRUE);
00506 }
00507 else
00508 {
00509 TBuffer* commandbuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) val);
00510 queue->AddBuffer(commandbuffer);
00511
00512 }
00513 return kTRUE;
00514 }
00515 return kFALSE;
00516 }
00517
00518 Bool_t TGo4Task::SubmitCommand(TGo4Command* com)
00519 {
00520 if (com==0) return kFALSE;
00521
00522 Bool_t rev=kTRUE;
00523 if(com->IsLocal())
00524 SubmitLocalCommand(com);
00525 else {
00526
00527 TGo4BufferQueue* queue=GetCommandQueue();
00528 if(queue!=0) {
00529
00530 TGo4LockGuard mainlock;
00531
00532 queue->AddBufferFromObject(com);
00533 } else
00534 rev = kFALSE;
00535 delete com;
00536 }
00537 return rev;
00538 }
00539
00540 TGo4TaskHandlerCommandList* TGo4Task::CreateCommandList()
00541 {
00542 return (new TGo4TaskHandlerCommandList("Go4ServerTaskDefaultCommandList") );
00543 }
00544
00545 TGo4Command* TGo4Task::MakeCommand(const char* name)
00546 {
00547 TGo4LockGuard mainlock;
00548 return ( fxCommandPrototype->MakeCommand(name) );
00549 }
00550
00551 Bool_t TGo4Task::SubmitLocalCommand(TGo4Command* com)
00552 {
00553 if(com==0) return kFALSE;
00554 com->SetMode(kGo4ComModeAdministrator);
00555 fxWorkHandler->Start(GetCommanderName());
00556 TGo4ObjectQueue* lqueue=GetLocalCommandQueue();
00557 if(lqueue!=0)
00558 lqueue->AddObject(com);
00559 else {
00560 delete com;
00561 return kFALSE;
00562 }
00563 return kTRUE;
00564 }
00565
00566 void TGo4Task::WakeCommandQueue(Int_t id)
00567 {
00568 if(GetTaskHandler() && GetTaskHandler()->IsAborting())
00569 {
00570
00571 return;
00572 }
00573
00574 TGo4Command* com=new TGo4Command("dummy","this wakes up queue",id);
00575 SubmitCommand(com);
00576 com=new TGo4Command("dummy","this wakes up queue",id);
00577 SubmitLocalCommand(com);
00578
00579
00580 }
00581
00582 void TGo4Task::GetStatus()
00583 {
00584 TGo4Log::Debug(" Task ''%s'' Send Status to Command Master ",GetName());
00585 TGo4BufferQueue* queue = GetStatusQueue();
00586 if(queue==0) return;
00587 {
00588 TGo4LockGuard mainguard;
00589
00590 TGo4TaskStatus* state=CreateStatus();
00591 queue->AddBufferFromObject(state);
00592 }
00593 }
00594
00595 Int_t TGo4Task::StartWorkThreads()
00596 {
00597 fbWorkIsStopped=kFALSE;
00598 if(fxOwner)
00599 return (fxOwner->StartWorkThreads());
00600 else
00601 return 0;
00602 }
00603
00604 Int_t TGo4Task::StopWorkThreads()
00605 {
00606 fbWorkIsStopped=kTRUE;
00607 if(fxOwner)
00608 return(fxOwner->StopWorkThreads());
00609 else
00610 return 0;
00611 }
00612
00613 void TGo4Task::SendStopBuffers(const char* taskname)
00614 {
00615 TGo4TaskHandler* th=GetTaskHandler();
00616 if(th==0) return;
00617 if(th->IsAborting())
00618 {
00619
00620 return;
00621 }
00622
00623 if(IsMaster())
00624 {
00625
00626 TGo4BufferQueue * comq=GetCommandQueue(taskname);
00627 if(comq)
00628 {
00629 comq->AddBuffer(fxStopBuffer,kTRUE);
00630 }
00631 }
00632 else
00633 {
00634
00635 TGo4BufferQueue * dataq=GetDataQueue(taskname);
00636 if(dataq)
00637 {
00638 dataq->AddBuffer(fxStopBuffer,kTRUE);
00639 }
00640 TGo4BufferQueue * statq=GetStatusQueue(taskname);
00641 if(statq)
00642 {
00643 statq->AddBuffer(fxStopBuffer,kTRUE);
00644 }
00645 }
00646 }
00647
00648 void TGo4Task::LockAll()
00649 {
00650 fxStatusMutex->Lock();
00651 TGo4LockGuard::LockMainMutex();
00652
00653 }
00654
00655 void TGo4Task::UnLockAll()
00656 {
00657 TGo4LockGuard::UnLockMainMutex();
00658 fxStatusMutex->UnLock();
00659 }
00660
00661 Int_t TGo4Task::Get_fgiTERMID()
00662 {
00663 return fgiTERMID;
00664 }
00665