00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "TGo4Task.h"
00019
00020 #include "TFile.h"
00021 #include "TBuffer.h"
00022 #include "TMutex.h"
00023
00024 #include "Go4Log/TGo4Log.h"
00025 #include "Go4LockGuard/TGo4LockGuard.h"
00026 #include "Go4CommandsBase/TGo4CommandInvoker.h"
00027 #include "Go4CommandsTaskHandler/TGo4TaskHandlerCommandList.h"
00028 #include "TGo4Master.h"
00029 #include "TGo4Slave.h"
00030 #include "TGo4LocalCommandRunnable.h"
00031 #include "Go4Queue/TGo4ObjectQueue.h"
00032 #include "Go4Queue/TGo4BufferQueue.h"
00033 #include "TGo4TaskOwner.h"
00034 #include <Rtypes.h>
00035 #include "Go4CommandsTaskHandler/TGo4TaskHandlerCommandList.h"
00036 #include "Go4Socket/TGo4Socket.h"
00037 #include <TMutex.h>
00038 #include <TBuffer.h>
00039
00040 const Int_t TGo4Task::fgiTERMID=999;
00041
00042 TGo4Task::TGo4Task(const char* name, Bool_t blockingmode,
00043 Bool_t autostart,
00044 Bool_t autocreate,
00045 Bool_t ismaster)
00046 : TGo4ThreadManager(name,blockingmode, autostart,autocreate),
00047 fbCommandMaster(ismaster), fxMaster(0), fxSlave(0),fxOwner(0),
00048 fbWorkIsStopped(kFALSE),fxStopBuffer(0),fxQuitBuffer(0)
00049 {
00050 fxCommandPrototype=0;
00051 fxStatusBuffer= new TBuffer(TBuffer::kWrite);
00052 fxStatusMutex= new TMutex;
00053 fxStopBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComCloseInput);
00054 fxQuitBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComQuit);
00055
00056 TGo4CommandInvoker::Instance();
00057 TGo4CommandInvoker::Register("NoReceiver", this);
00058 TGo4CommandInvoker::Register("Task",this);
00059
00060
00061
00062 fxLocalCommandQueue = new TGo4ObjectQueue("localcommands");
00063
00064
00065 Text_t nomen[TGo4ThreadManager::fguTEXTLENGTH];
00066 snprintf(nomen,TGo4ThreadManager::fguTEXTLENGTH-1, "LocalCommandRunnable of %s", name);
00067 TGo4LocalCommandRunnable* commander = new TGo4LocalCommandRunnable(nomen,this);
00068 snprintf(nomen,TGo4ThreadManager::fguTEXTLENGTH-1, "COMMANDER-%s", name);
00069 fxCommanderName=nomen;
00070 GetWorkHandler()->NewThread(GetCommanderName(), commander);
00071 }
00072
00073 TGo4Task::~TGo4Task()
00074 {
00075 if(fxOwner)
00076 {
00077 fxOwner->SetTask(0,kFALSE);
00078 delete fxOwner;
00079 }
00080 delete fxLocalCommandQueue;
00081 delete fxCommandPrototype;
00082 delete fxQuitBuffer;
00083 delete fxStopBuffer;
00084 delete fxStatusBuffer;
00085 delete fxStatusMutex;
00086 }
00087
00088 void TGo4Task::SetOwner(TGo4TaskOwner* owner)
00089 {
00090 fxOwner=owner;
00091 SetMaster(dynamic_cast<TGo4Master*>(owner));
00092 SetSlave(dynamic_cast<TGo4Slave*>(owner));
00093 }
00094
00095 void TGo4Task::Start()
00096 {
00097 if(fxSlave) fxSlave->Start();
00098 }
00099
00100 void TGo4Task::Stop()
00101 {
00102 if(fxSlave) fxSlave->Stop();
00103 }
00104
00105
00106
00107
00108 void TGo4Task::Quit()
00109 {
00110 if(fxSlave) fxSlave->Quit();
00111 }
00112
00113 void TGo4Task::KillMain()
00114 {
00115 if(fxSlave) fxSlave->KillMain();
00116
00117 }
00118 void TGo4Task::RestartMain()
00119 {
00120 if(fxSlave) fxSlave->RestartMain();
00121 }
00122
00123 void TGo4Task::Terminate (Bool_t termapp)
00124 {
00125 if(fxSlave)
00126 fxSlave->Terminate(termapp);
00127 else
00128 TGo4ThreadManager::Terminate(termapp);
00129 }
00130
00131 void TGo4Task::TerminateFast ()
00132 {
00133 if(fxSlave)
00134 fxSlave->TerminateFast();
00135 else
00136 TGo4ThreadManager::TerminateFast();
00137 }
00138
00139 void TGo4Task::ExecuteString(const Text_t* command)
00140 {
00141 if(fxSlave)
00142 fxSlave->ExecuteString(command);
00143 else
00144 gROOT->ProcessLine(command);
00145
00146 }
00147
00148 TGo4TaskHandler* TGo4Task::GetTaskHandler()
00149 {
00150 return 0;
00151 }
00152
00153 TGo4BufferQueue* TGo4Task::GetCommandQueue(const char*)
00154 {
00155 return 0;
00156 }
00157
00158 TGo4BufferQueue * TGo4Task::GetStatusQueue(const char*)
00159 {
00160 return 0;
00161 }
00162 TGo4BufferQueue * TGo4Task::GetDataQueue(const char*)
00163 {
00164 return 0;
00165 }
00166
00167 TGo4TaskHandlerCommandList * TGo4Task::GetPrototype()
00168 {
00169
00170 return fxCommandPrototype;
00171 }
00172
00173 TGo4Status * TGo4Task::NextStatus(Bool_t wait)
00174 {
00175 if(!IsMaster()) return 0;
00176 TObject* obj=0;
00177 TGo4Status* stat=0;
00178 TGo4BufferQueue* statqueue=dynamic_cast<TGo4BufferQueue*> (GetStatusQueue());
00179 if(statqueue)
00180 {
00181 if(!wait && statqueue->IsEmpty())
00182 return 0;
00183 obj=statqueue->WaitObjectFromBuffer();
00184 if(obj)
00185 {
00186 if(obj->InheritsFrom("TGo4Status"))
00187 {
00188 stat= dynamic_cast<TGo4Status*>(obj);
00189 }
00190 else
00191 {
00192 TGo4Log::Debug(" !!! Master Task: NextStatus ERROR, unknown object %s from status queue!!! ",
00193 obj->GetName());
00194 delete obj;
00195 }
00196 }
00197 else
00198 {
00199 TGo4Log::Debug(" !!! Master Task NextStatus ERROR -- NULL object from data queue!!! ");
00200 }
00201 }
00202 else
00203 {
00204
00205 stat=0;
00206 }
00207 return stat;
00208 }
00209
00210
00211 TObject * TGo4Task::NextObject(Bool_t wait)
00212 {
00213 if(!IsMaster()) return 0;
00214 TObject* obj=0;
00215 TGo4BufferQueue* dataqueue=dynamic_cast<TGo4BufferQueue*> (GetDataQueue());
00216 if(dataqueue)
00217 {
00218 if(!wait && dataqueue->IsEmpty())
00219 return 0;
00220 obj=dataqueue->WaitObjectFromBuffer();
00221
00222 }
00223 else
00224 {
00225
00226 obj=0;
00227 }
00228 return obj;
00229 }
00230 void TGo4Task::AddUserCommand(TGo4Command* com)
00231 {
00232
00233 fxCommandPrototype->AddCommand(com);
00234 }
00235 void TGo4Task::AddUserCommandList(TGo4CommandProtoList * comlist)
00236 {
00237 if(comlist)
00238 {
00239 *fxCommandPrototype += *comlist;
00240
00241 delete comlist;
00242 comlist=0;
00243 }
00244 }
00245
00246 void TGo4Task::SendObject(TObject * obj, const char* receiver)
00247 {
00248 if(IsMaster()) return;
00249 if(obj)
00250 {
00251
00252 TGo4BufferQueue * dataq=GetDataQueue(receiver);
00253 if(dataq)
00254 {
00255
00256
00257 dataq->AddBufferFromObject(obj);
00258 }
00259 else
00260 {
00261 TGo4Log::Debug(" !!! Task - ERROR sending object - no data queue !!! ");
00262 }
00263 }
00264 else
00265 {
00266
00267 SendStatusMessage(2, kTRUE, "Task - object not found");
00268 }
00269 }
00270
00271 void TGo4Task::SendStatus(TGo4Status * stat, const char* receiver)
00272 {
00273 if(IsMaster()) return ;
00274 if(stat)
00275 {
00276
00277 TGo4BufferQueue * statq=GetStatusQueue(receiver);
00278 if(statq)
00279 {
00280 TGo4Log::Debug(" Task - sending status %s ", stat->ClassName());
00281 statq->AddBufferFromObject(stat);
00282 }
00283 else
00284 {
00285 TGo4Log::Debug(" !!! Task - ERROR sending status: no status queue !!! ");
00286 }
00287 }
00288 else
00289 {
00290
00291
00292 }
00293
00294
00295 }
00296 void TGo4Task::SendStatusBuffer()
00297 {
00298 if(IsMaster()) return;
00299 TGo4LockGuard statguard(fxStatusMutex);
00300 TGo4Log::Debug(" Task - sending status buffer ");
00301 TGo4BufferQueue * statq=GetStatusQueue();
00302 if(statq) statq->AddBuffer(fxStatusBuffer,kTRUE);
00303 }
00304 void TGo4Task::SendStatusMessage(Int_t level, Bool_t printout, const char* text,...)
00305 {
00306 if(IsMaster()) return;
00307 Int_t lbuflen=256;
00308
00309 char txtbuf[lbuflen];
00310 va_list args;
00311 va_start(args, text);
00312 vsnprintf(txtbuf, lbuflen, text, args);
00313 va_end(args);
00314
00315 const char* dest;
00316 char* curs=txtbuf;
00317 TString receiver=txtbuf;
00318 Ssiz_t pos=receiver.Index("::",2,0,TString::kExact);
00319 if(pos!=kNPOS)
00320 {
00321
00322 receiver.Resize(pos);
00323 dest=receiver.Data();
00324 curs += ((size_t) pos);
00325 curs +=2;
00326 }
00327 else
00328 {
00329 dest=0;
00330 }
00331
00332 Bool_t previousmode=TGo4Log::IsOutputEnabled();
00333 TGo4Log::OutputEnable(printout);
00334 const char* go4mess=TGo4Log::Message(level,curs);
00335 TGo4Log::OutputEnable(previousmode);
00336 if(level>0 && go4mess!=0)
00337 {
00338
00339 TGo4Status* message= new TGo4Status(go4mess);
00340 SendStatus(message,dest);
00341 delete message;
00342 }
00343 }
00344 void TGo4Task::UpdateStatusBuffer()
00345 {
00346 if(IsMaster()) return;
00347 TGo4LockGuard statguard(fxStatusMutex);
00348 TFile *filsav = gFile;
00349 gFile = 0;
00350 TGo4TaskStatus* state=0;
00351 if(fxSlave)
00352 state=fxSlave->CreateStatus();
00353 else
00354 state=CreateStatus();
00355 fxStatusBuffer->Reset();
00356 fxStatusBuffer->InitMap();
00357 fxStatusBuffer->WriteObject(state);
00358 gFile = filsav;
00359 delete state;
00360 }
00361 TGo4Command* TGo4Task::NextCommand()
00362 {
00363 if(IsMaster()) return 0;
00364 TGo4Command* com=0;
00365 TObject* obj=0;
00366 TGo4BufferQueue * comq=GetCommandQueue();
00367 if(comq==0) return 0;
00368 if(!comq->IsEmpty() || (fxSlave!=0 && !fxSlave->MainIsRunning() ) )
00369 {
00370
00371
00372 obj=comq->WaitObjectFromBuffer();
00373 if(obj)
00374 {
00375 if(obj->InheritsFrom("TGo4Command"))
00376 {
00377 com= dynamic_cast<TGo4Command*>(obj);
00378 com->SetTaskName("current");
00379 com->SetMode(kGo4ComModeController);
00380 }
00381 else
00382 {
00383 delete obj;
00384 }
00385 }
00386 else
00387 {
00388
00389 }
00390 }
00391 else
00392 {
00393 com=0;
00394 }
00395 return com;
00396 }
00397 Int_t TGo4Task::Initialization()
00398 {
00399
00400 Int_t rev=-1;
00401 if(fbInitDone)
00402
00403 {
00404 rev=0;
00405 }
00406 else
00407 {
00408 if(fxCommandPrototype==0)
00409 {
00410 if(fxMaster)
00411 {
00412 fxCommandPrototype=fxMaster->CreateCommandList();
00413 TGo4Log::Debug(" Task -- command list is created from Master factory");
00414 }
00415 else
00416 {
00417 fxCommandPrototype=CreateCommandList();
00418 TGo4Log::Debug(" Task -- command list is created from Task factory");
00419 }
00420 }
00421 rev=TGo4ThreadManager::Initialization();
00422 fxWorkHandler->Start(GetCommanderName());
00423 if(fxSlave) fxSlave->Initialization();
00424 }
00425 return rev;
00426 }
00427
00428 void TGo4Task::UpdateStatus(TGo4TaskStatus* state)
00429 {
00430 TGo4TaskHandlerStatus* taskhandlerstatus=0;
00431 TGo4TaskHandler* th=GetTaskHandler();
00432 if(th) taskhandlerstatus=th->CreateStatus();
00433 state->SetTaskHandlerStatus(taskhandlerstatus);
00434 state->SetFlags(fbAppBlocking, fbAutoCreate, fbAutoStart, fbTerminating, fbInitDone);
00435 }
00436
00437 TGo4TaskStatus* TGo4Task::CreateStatus()
00438 {
00439 TGo4TaskStatus* stat=new TGo4TaskStatus((Text_t*) GetName());
00440 UpdateStatus(stat);
00441 return stat;
00442 }
00443
00444
00445 Bool_t TGo4Task::SubmitCommand(const char* name)
00446 {
00447 if(!strcmp(name,"THEMQuit"))
00448 {
00449 return (SubmitEmergencyCommand(kComQuit));
00450 }
00451 else if (!strcmp(name,"THEMKill"))
00452 {
00453 return (SubmitEmergencyCommand(kComKillMain));
00454 }
00455 else if (!strcmp(name,"THEMRestart"))
00456 {
00457 return (SubmitEmergencyCommand(kComRestartMain));
00458 }
00459 else
00460 {
00461 TGo4Command* com=MakeCommand(name);
00462 return (SubmitCommand(com)) ;
00463 }
00464 }
00465 Bool_t TGo4Task::SubmitEmergencyCommand(Go4EmergencyCommand_t val)
00466 {
00467 TGo4BufferQueue* queue=GetCommandQueue();
00468 if(queue!=0)
00469 {
00470
00471 if(val==kComQuit)
00472 {
00473
00474 queue->AddBuffer(fxQuitBuffer,kTRUE);
00475 }
00476 else
00477 {
00478 TBuffer* commandbuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) val);
00479 queue->AddBuffer(commandbuffer);
00480 }
00481
00482 return kTRUE;
00483 }
00484 return kFALSE;
00485 }
00486
00487 Bool_t TGo4Task::SubmitEmergencyData(Go4EmergencyCommand_t val, const char* receiver)
00488 {
00489 TGo4BufferQueue* queue=GetDataQueue(receiver);
00490 if(queue!=0)
00491 {
00492
00493 if(val==kComQuit)
00494 {
00495
00496 queue->AddBuffer(fxQuitBuffer,kTRUE);
00497 }
00498 else
00499 {
00500 TBuffer* commandbuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) val);
00501 queue->AddBuffer(commandbuffer);
00502
00503 }
00504 return kTRUE;
00505 }
00506 return kFALSE;
00507 }
00508
00509 Bool_t TGo4Task::SubmitCommand(TGo4Command* com)
00510 {
00511 if (com==0) return kFALSE;
00512
00513 Bool_t rev=kTRUE;
00514 if(com->IsLocal())
00515 SubmitLocalCommand(com);
00516 else {
00517
00518 TGo4BufferQueue* queue=GetCommandQueue();
00519 if(queue!=0) {
00520
00521 TGo4LockGuard mainlock;
00522
00523 queue->AddBufferFromObject(com);
00524 } else
00525 rev = kFALSE;
00526 delete com;
00527 }
00528 return rev;
00529 }
00530
00531 TGo4TaskHandlerCommandList* TGo4Task::CreateCommandList()
00532 {
00533 return (new TGo4TaskHandlerCommandList("Go4ServerTaskDefaultCommandList") );
00534 }
00535
00536 TGo4Command* TGo4Task::MakeCommand(const char* name)
00537 {
00538 TGo4LockGuard mainlock;
00539 return ( fxCommandPrototype->MakeCommand(name) );
00540 }
00541
00542 Bool_t TGo4Task::SubmitLocalCommand(TGo4Command* com)
00543 {
00544 if(com==0) return kFALSE;
00545 com->SetMode(kGo4ComModeController);
00546 fxWorkHandler->Start(GetCommanderName());
00547 TGo4ObjectQueue* lqueue=GetLocalCommandQueue();
00548 if(lqueue!=0)
00549 lqueue->AddObject(com);
00550 else {
00551 delete com;
00552 return kFALSE;
00553 }
00554 return kTRUE;
00555 }
00556
00557 void TGo4Task::WakeCommandQueue(Int_t id)
00558 {
00559 if(GetTaskHandler() && GetTaskHandler()->IsAborting())
00560 {
00561
00562 return;
00563 }
00564
00565 TGo4Command* com=new TGo4Command("dummy","this wakes up queue",id);
00566 SubmitCommand(com);
00567 com=new TGo4Command("dummy","this wakes up queue",id);
00568 SubmitLocalCommand(com);
00569
00570 }
00571
00572 void TGo4Task::GetStatus()
00573 {
00574 TGo4Log::Debug(" Task ''%s'' Send Status to Command Master ",GetName());
00575 TGo4BufferQueue* queue = GetStatusQueue();
00576 if(queue==0) return;
00577 {
00578 TGo4LockGuard mainguard;
00579
00580 TGo4TaskStatus* state=CreateStatus();
00581 queue->AddBufferFromObject(state);
00582 }
00583 }
00584
00585 Int_t TGo4Task::StartWorkThreads()
00586 {
00587 fbWorkIsStopped=kFALSE;
00588 if(fxOwner)
00589 return (fxOwner->StartWorkThreads());
00590 else
00591 return 0;
00592 }
00593
00594 Int_t TGo4Task::StopWorkThreads()
00595 {
00596 fbWorkIsStopped=kTRUE;
00597 if(fxOwner)
00598 return(fxOwner->StopWorkThreads());
00599 else
00600 return 0;
00601 }
00602
00603 void TGo4Task::SendStopBuffers(const char* taskname)
00604 {
00605 TGo4TaskHandler* th=GetTaskHandler();
00606 if(th==0) return;
00607 if(th->IsAborting())
00608 {
00609
00610 return;
00611 }
00612
00613 if(IsMaster())
00614 {
00615
00616 TGo4BufferQueue * comq=GetCommandQueue(taskname);
00617 if(comq)
00618 {
00619 comq->AddBuffer(fxStopBuffer,kTRUE);
00620 }
00621 }
00622 else
00623 {
00624
00625 TGo4BufferQueue * dataq=GetDataQueue(taskname);
00626 if(dataq)
00627 {
00628 dataq->AddBuffer(fxStopBuffer,kTRUE);
00629 }
00630 TGo4BufferQueue * statq=GetStatusQueue(taskname);
00631 if(statq)
00632 {
00633 statq->AddBuffer(fxStopBuffer,kTRUE);
00634 }
00635 }
00636 }
00637
00638 ClassImp(TGo4Task)
00639