Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

/Go4TaskHandler/TGo4Task.cxx

Go to the documentation of this file.
00001 //---------------------------------------------------------------
00002 //        Go4 Release Package v2.10-5 (build 21005) 
00003 //                      03-Nov-2005
00004 //---------------------------------------------------------------
00005 //       The GSI Online Offline Object Oriented (Go4) Project
00006 //       Experiment Data Processing at DVEE department, GSI
00007 //---------------------------------------------------------------
00008 //
00009 //Copyright (C) 2000- Gesellschaft f. Schwerionenforschung, GSI
00010 //                    Planckstr. 1, 64291 Darmstadt, Germany
00011 //Contact:            http://go4.gsi.de
00012 //----------------------------------------------------------------
00013 //This software can be used under the license agreements as stated
00014 //in Go4License.txt file which is part of the distribution.
00015 //----------------------------------------------------------------
00016 /* Generated by Together */
00017 
00018 #include "TGo4Task.h"
00019 
00020 #include "TFile.h"
00021 #include "TBuffer.h"
00022 #include "TMutex.h"
00023 
00024 #include "Go4Log/TGo4Log.h"
00025 #include "Go4LockGuard/TGo4LockGuard.h"
00026 #include "Go4CommandsBase/TGo4CommandInvoker.h"
00027 #include "Go4CommandsTaskHandler/TGo4TaskHandlerCommandList.h"
00028 #include "TGo4Master.h"
00029 #include "TGo4Slave.h"
00030 #include "TGo4LocalCommandRunnable.h"
00031 #include "Go4Queue/TGo4ObjectQueue.h"
00032 #include "Go4Queue/TGo4BufferQueue.h"
00033 #include "TGo4TaskOwner.h"
00034 #include <Rtypes.h>
00035 #include "Go4CommandsTaskHandler/TGo4TaskHandlerCommandList.h"
00036 #include "Go4Socket/TGo4Socket.h"
00037 #include <TMutex.h>
00038 #include <TBuffer.h>
00039 
00040 const Int_t TGo4Task::fgiTERMID=999;
00041 
00042 TGo4Task::TGo4Task(const char* name, Bool_t blockingmode,
00043                                Bool_t autostart,
00044                                Bool_t autocreate,
00045                                Bool_t ismaster)
00046   : TGo4ThreadManager(name,blockingmode, autostart,autocreate),
00047   fbCommandMaster(ismaster), fxMaster(0), fxSlave(0),fxOwner(0),
00048   fbWorkIsStopped(kFALSE),fxStopBuffer(0),fxQuitBuffer(0)
00049 {
00050 fxCommandPrototype=0; 
00051 fxStatusBuffer= new TBuffer(TBuffer::kWrite);
00052 fxStatusMutex= new TMutex;
00053 fxStopBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComCloseInput);
00054 fxQuitBuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) kComQuit);
00055 
00056 TGo4CommandInvoker::Instance(); // make sure we have an invoker instance!
00057 TGo4CommandInvoker::Register("NoReceiver", this); // for simple command test
00058 TGo4CommandInvoker::Register("Task",this); // register as command receiver at the global invoker
00059   
00060 
00061 // local command queue:
00062 fxLocalCommandQueue = new TGo4ObjectQueue("localcommands");
00063 
00064 // local command runnable:
00065 Text_t nomen[TGo4ThreadManager::fguTEXTLENGTH];
00066 snprintf(nomen,TGo4ThreadManager::fguTEXTLENGTH-1, "LocalCommandRunnable of %s", name);
00067 TGo4LocalCommandRunnable* commander = new TGo4LocalCommandRunnable(nomen,this);
00068 snprintf(nomen,TGo4ThreadManager::fguTEXTLENGTH-1, "COMMANDER-%s", name);
00069 fxCommanderName=nomen;
00070 GetWorkHandler()->NewThread(GetCommanderName(), commander);
00071 }
00072 
00073 TGo4Task::~TGo4Task()
00074 {
00075    if(fxOwner) 
00076       {
00077          fxOwner->SetTask(0,kFALSE); // on termination from threadmanager, we take over responsibility for cleanup
00078          delete fxOwner;   
00079       }
00080    delete fxLocalCommandQueue;
00081    delete fxCommandPrototype;
00082    delete fxQuitBuffer;
00083    delete fxStopBuffer;
00084    delete fxStatusBuffer;
00085    delete fxStatusMutex;   
00086 }
00087 
00088 void TGo4Task::SetOwner(TGo4TaskOwner* owner)
00089 {
00090    fxOwner=owner;    
00091    SetMaster(dynamic_cast<TGo4Master*>(owner));
00092    SetSlave(dynamic_cast<TGo4Slave*>(owner));
00093 }
00094 
00095 void TGo4Task::Start()
00096 {
00097    if(fxSlave) fxSlave->Start();
00098 }
00099 
00100 void TGo4Task::Stop()
00101 {
00102    if(fxSlave) fxSlave->Stop();
00103 }
00104 
00105 
00106 
00107 
00108 void TGo4Task::Quit()
00109 {
00110    if(fxSlave) fxSlave->Quit();
00111 }
00112 
00113 void TGo4Task::KillMain()
00114 {
00115    if(fxSlave) fxSlave->KillMain();
00116 
00117 }
00118 void TGo4Task::RestartMain()
00119 {
00120    if(fxSlave) fxSlave->RestartMain();
00121 }
00122 
00123 void TGo4Task::Terminate (Bool_t termapp)
00124 {
00125    if(fxSlave) 
00126       fxSlave->Terminate(termapp);   
00127    else
00128       TGo4ThreadManager::Terminate(termapp);
00129 }    
00130 
00131 void TGo4Task::TerminateFast ()
00132 {
00133    if(fxSlave) 
00134       fxSlave->TerminateFast();    
00135    else
00136       TGo4ThreadManager::TerminateFast();
00137 }
00138 
00139 void TGo4Task::ExecuteString(const Text_t* command)
00140 {
00141    if(fxSlave) 
00142       fxSlave->ExecuteString(command);
00143    else
00144       gROOT->ProcessLine(command);
00145 
00146 }
00147 
00148 TGo4TaskHandler* TGo4Task::GetTaskHandler()
00149 {
00150    return 0;  
00151 }
00152 
00153 TGo4BufferQueue* TGo4Task::GetCommandQueue(const char*)
00154 {
00155    return 0; // please override
00156 }
00157 
00158 TGo4BufferQueue * TGo4Task::GetStatusQueue(const char*)
00159 {
00160    return 0; // please override
00161 }
00162 TGo4BufferQueue * TGo4Task::GetDataQueue(const char*)
00163 {
00164   return 0; // please override
00165 }
00166 
00167 TGo4TaskHandlerCommandList * TGo4Task::GetPrototype()
00168 {
00169 // keep this method for compatibility reasons, user should not need access to list
00170     return fxCommandPrototype;
00171 }
00172 
00173 TGo4Status * TGo4Task::NextStatus(Bool_t wait)
00174 {
00175 if(!IsMaster()) return 0;
00176    TObject* obj=0;
00177    TGo4Status* stat=0;
00178    TGo4BufferQueue* statqueue=dynamic_cast<TGo4BufferQueue*> (GetStatusQueue());
00179    if(statqueue)
00180       {
00181          if(!wait && statqueue->IsEmpty())
00182             return 0;   // polling mode for timer: we do not go into condition wait!
00183          obj=statqueue->WaitObjectFromBuffer();
00184          if(obj)
00185            {
00186               if(obj->InheritsFrom("TGo4Status"))
00187                  {
00188                         stat= dynamic_cast<TGo4Status*>(obj);
00189                  }
00190               else
00191                  {
00192                     TGo4Log::Debug(" !!! Master Task: NextStatus ERROR, unknown object %s from status queue!!! ",
00193                                 obj->GetName());
00194                     delete obj;
00195                  }   // if(obj->InheritsFrom("TGo4Status"))
00196            }
00197          else
00198             {
00199                TGo4Log::Debug(" !!! Master Task NextStatus ERROR -- NULL object from data queue!!! ");
00200             } // if(obj)
00201       }
00202    else //if(statqueue)
00203       {
00204              //TGo4Log::Debug(" !!! Master Task NextStatus ERROR -- no data queue!!! ");
00205              stat=0;
00206       }
00207    return stat;
00208 }
00209 
00210 
00211 TObject * TGo4Task::NextObject(Bool_t wait)
00212 {
00213 if(!IsMaster()) return 0;
00214    TObject* obj=0;
00215    TGo4BufferQueue* dataqueue=dynamic_cast<TGo4BufferQueue*> (GetDataQueue());
00216    if(dataqueue)
00217       {
00218          if(!wait && dataqueue->IsEmpty())
00219             return 0;   // polling mode for timer: we do not go into condition wait!
00220          obj=dataqueue->WaitObjectFromBuffer(); // wait for buffer and stream object
00221 
00222       }
00223    else //if(dataqueue)
00224       {
00225              //TGo4Log::Debug(" !!! Master Task NextObject ERROR -- no data queue!!! ");
00226          obj=0;
00227       }
00228    return obj;
00229 }
00230 void TGo4Task::AddUserCommand(TGo4Command* com)
00231 {
00232 
00233    fxCommandPrototype->AddCommand(com);
00234 }
00235 void TGo4Task::AddUserCommandList(TGo4CommandProtoList * comlist)
00236 {
00237    if(comlist)
00238       {
00239          *fxCommandPrototype += *comlist;
00240             // operator+= of TGo4CommandProtolist puts new commands into old list
00241          delete comlist;
00242          comlist=0;
00243       }
00244 }
00245 
00246 void TGo4Task::SendObject(TObject * obj, const char* receiver)
00247 {
00248    if(IsMaster()) return;
00249    if(obj)
00250       {
00251          // object exists, put it into data queue
00252          TGo4BufferQueue * dataq=GetDataQueue(receiver);         
00253          if(dataq)
00254             {
00255 //               TGo4Log::Debug(" Task - sending object: %s ",obj->GetName());
00256                //SendStatusMessage(1, kTRUE,"Task - sending object: %s ",obj->GetName());
00257                dataq->AddBufferFromObject(obj);
00258             }
00259          else
00260             {
00261                TGo4Log::Debug(" !!! Task - ERROR sending object - no data queue !!! ");
00262             }
00263       }
00264    else
00265       {
00266          // object not found, send error message through status
00267          SendStatusMessage(2, kTRUE, "Task - object not found");
00268       }
00269 }
00270 
00271 void TGo4Task::SendStatus(TGo4Status * stat, const char* receiver)
00272 {
00273    if(IsMaster()) return ;
00274    if(stat)
00275       {
00276          // object exists, put it into status queue         
00277          TGo4BufferQueue * statq=GetStatusQueue(receiver);
00278          if(statq)
00279             {
00280                TGo4Log::Debug(" Task - sending status %s ", stat->ClassName());
00281                statq->AddBufferFromObject(stat);
00282             }
00283          else
00284             {
00285                TGo4Log::Debug(" !!! Task - ERROR sending status: no status queue !!! ");
00286             }
00287       }
00288    else
00289       {
00290          // TGo4Log::Debug(" !!! Task - ERROR sending status: no such object!!! ");
00291 
00292       }
00293 
00294 
00295 }
00296 void TGo4Task::SendStatusBuffer()
00297 {
00298 if(IsMaster()) return;
00299    TGo4LockGuard statguard(fxStatusMutex); // do not send during buffer update
00300    TGo4Log::Debug(" Task - sending status buffer ");
00301    TGo4BufferQueue * statq=GetStatusQueue();
00302    if(statq) statq->AddBuffer(fxStatusBuffer,kTRUE);
00303 }
00304 void TGo4Task::SendStatusMessage(Int_t level, Bool_t printout, const char* text,...)
00305 {
00306 if(IsMaster()) return;
00307    Int_t lbuflen=256;
00308    // put potential printf arguments in text:
00309    char txtbuf[lbuflen];
00310    va_list args;
00311    va_start(args, text);
00312    vsnprintf(txtbuf, lbuflen, text, args);
00313    va_end(args);
00314    // figure out here possible destination for message in string:
00315    const char* dest;
00316    char* curs=txtbuf;
00317    TString receiver=txtbuf;
00318    Ssiz_t pos=receiver.Index("::",2,0,TString::kExact);
00319    if(pos!=kNPOS)
00320       {
00321        // before this we have receiver:
00322        receiver.Resize(pos);
00323        dest=receiver.Data();  
00324        curs += ((size_t) pos);
00325        curs +=2; // skip separator  
00326       }
00327    else
00328       {
00329          dest=0;   
00330       }
00331    //cout <<"SSSSSSendStatusMessage has receiver "<<dest <<" and message "<<curs << endl;
00332    Bool_t previousmode=TGo4Log::IsOutputEnabled();
00333    TGo4Log::OutputEnable(printout); // override the messaging state
00334    const char* go4mess=TGo4Log::Message(level,curs);
00335    TGo4Log::OutputEnable(previousmode); // restore old state of messageing
00336    if(level>0 && go4mess!=0)
00337    {
00338       // do not send debug-level output to gui, and do not send supressed messages as empty string!
00339       TGo4Status* message= new TGo4Status(go4mess);
00340       SendStatus(message,dest);
00341       delete message;
00342    }
00343 }
00344 void TGo4Task::UpdateStatusBuffer()
00345 {
00346 if(IsMaster()) return;
00347   TGo4LockGuard statguard(fxStatusMutex); // do not update during sending
00348    TFile *filsav = gFile;
00349    gFile = 0;
00350    TGo4TaskStatus* state=0;
00351    if(fxSlave)
00352       state=fxSlave->CreateStatus();
00353    else
00354       state=CreateStatus();
00355    fxStatusBuffer->Reset();
00356    fxStatusBuffer->InitMap();
00357    fxStatusBuffer->WriteObject(state);
00358    gFile = filsav;
00359    delete state; // avoid memory leak!!
00360 }
00361 TGo4Command* TGo4Task::NextCommand()
00362 {
00363 if(IsMaster()) return 0;
00364    TGo4Command* com=0;
00365    TObject* obj=0;
00366    TGo4BufferQueue * comq=GetCommandQueue();
00367    if(comq==0) return 0;
00368    if(!comq->IsEmpty() || (fxSlave!=0 && !fxSlave->MainIsRunning() ) )
00369       {
00370          // put new command out of queue
00371          // or wait for command if analysis is stopped
00372          obj=comq->WaitObjectFromBuffer();
00373          if(obj)
00374            {
00375               if(obj->InheritsFrom("TGo4Command"))
00376                  {
00377                      com= dynamic_cast<TGo4Command*>(obj);
00378                      com->SetTaskName("current");
00379                      com->SetMode(kGo4ComModeController); 
00380                  }
00381               else
00382                  {
00383                     delete obj;
00384                  }
00385            }
00386          else
00387             {
00388                //TGo4Log::Debug(" !!! Slave ERROR -- NULL object from command queue!!! ");
00389             }
00390       }
00391    else //if(!fxCommandQ->IsEmpty() || !AnalysisIsRunning())
00392       {
00393          com=0;
00394       }
00395    return com;
00396 }
00397 Int_t TGo4Task::Initialization()
00398 {
00399    // this method will be called by the application control timer every timerperiod
00400    Int_t rev=-1;
00401    if(fbInitDone)
00402       // already initialized
00403       {
00404        rev=0;
00405       } 
00406    else
00407       {
00408          if(fxCommandPrototype==0)
00409             {
00410                if(fxMaster)               
00411                   {
00412                      fxCommandPrototype=fxMaster->CreateCommandList(); // use factory method
00413                      TGo4Log::Debug(" Task --  command list is created from Master factory");
00414                   }
00415                else
00416                   {
00417                      fxCommandPrototype=CreateCommandList(); 
00418                      TGo4Log::Debug(" Task --  command list is created from Task factory");
00419                   }
00420             }
00421          rev=TGo4ThreadManager::Initialization(); // this will launch threads, etc.
00422          fxWorkHandler->Start(GetCommanderName()); // for non autostart mode
00423          if(fxSlave) fxSlave->Initialization();
00424       }// else if(fbInitDone)
00425     return rev;
00426 }
00427 
00428 void TGo4Task::UpdateStatus(TGo4TaskStatus* state)
00429 {
00430    TGo4TaskHandlerStatus* taskhandlerstatus=0;
00431    TGo4TaskHandler* th=GetTaskHandler();
00432    if(th) taskhandlerstatus=th->CreateStatus();
00433    state->SetTaskHandlerStatus(taskhandlerstatus);
00434    state->SetFlags(fbAppBlocking, fbAutoCreate, fbAutoStart, fbTerminating, fbInitDone);
00435 }
00436 
00437 TGo4TaskStatus* TGo4Task::CreateStatus()
00438 {
00439    TGo4TaskStatus*   stat=new TGo4TaskStatus((Text_t*) GetName());
00440    UpdateStatus(stat); // set the internals
00441    return stat;
00442 }
00443 
00444 
00445 Bool_t TGo4Task::SubmitCommand(const char* name)
00446 {
00447    if(!strcmp(name,"THEMQuit"))
00448       {
00449          return (SubmitEmergencyCommand(kComQuit));
00450       }
00451    else if (!strcmp(name,"THEMKill"))
00452       {
00453          return (SubmitEmergencyCommand(kComKillMain));
00454       }
00455    else if (!strcmp(name,"THEMRestart"))
00456       {
00457          return (SubmitEmergencyCommand(kComRestartMain));
00458       }
00459    else
00460       {
00461          TGo4Command* com=MakeCommand(name);
00462          return (SubmitCommand(com)) ;
00463       }
00464 }
00465 Bool_t TGo4Task::SubmitEmergencyCommand(Go4EmergencyCommand_t val)
00466 {
00467 TGo4BufferQueue* queue=GetCommandQueue();
00468 if(queue!=0)
00469    {
00470       // we have an active command queue...
00471          if(val==kComQuit)
00472             {
00473                // quit command might be send from thread. use preallocated buffer!
00474                queue->AddBuffer(fxQuitBuffer,kTRUE);
00475             }
00476          else
00477             {   
00478                TBuffer* commandbuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) val);
00479                queue->AddBuffer(commandbuffer); // put command into queue
00480             }
00481          
00482          return kTRUE;
00483    }
00484 return kFALSE;
00485 }
00486 
00487 Bool_t TGo4Task::SubmitEmergencyData(Go4EmergencyCommand_t val, const char* receiver)
00488 {
00489 TGo4BufferQueue* queue=GetDataQueue(receiver);
00490 if(queue!=0)
00491    {
00492       // we have an active data queue...
00493           if(val==kComQuit)
00494             {
00495                // quit data is send from thread. use preallocated buffer!
00496                queue->AddBuffer(fxQuitBuffer,kTRUE);
00497             }
00498          else
00499             {  
00500                TBuffer* commandbuffer=TGo4BufferQueue::CreateValueBuffer((UInt_t) val);
00501                queue->AddBuffer(commandbuffer); // put command into queue
00502                //cout <<"UUUUUUUU SubmitEmergencyData to "<<receiver << endl;
00503             }
00504          return kTRUE;
00505    }
00506 return kFALSE;
00507 }
00508 
00509 Bool_t TGo4Task::SubmitCommand(TGo4Command* com)
00510 {
00511    if (com==0) return kFALSE; 
00512     
00513    Bool_t rev=kTRUE;
00514    if(com->IsLocal())
00515        SubmitLocalCommand(com);
00516    else {
00517      // command for remote client, put into actual client queue
00518      TGo4BufferQueue* queue=GetCommandQueue();
00519      if(queue!=0) {
00520         // we have an active command queue...
00521         TGo4LockGuard mainlock; // protect the streamer!
00522         //cout << "Mainlock acquired by server task: SubmitCommand"<< endl;
00523         queue->AddBufferFromObject(com); // put command into queue
00524      } else 
00525         rev = kFALSE;
00526      delete com; // buffer queue does not adopt com, we delete it
00527    } 
00528    return rev;
00529 }
00530 
00531 TGo4TaskHandlerCommandList* TGo4Task::CreateCommandList()
00532 {
00533    return (new TGo4TaskHandlerCommandList("Go4ServerTaskDefaultCommandList") );
00534 }
00535 
00536 TGo4Command* TGo4Task::MakeCommand(const char* name)
00537 {
00538    TGo4LockGuard mainlock;
00539    return ( fxCommandPrototype->MakeCommand(name) );
00540 }
00541 
00542 Bool_t TGo4Task::SubmitLocalCommand(TGo4Command* com)
00543 {
00544    if(com==0) return kFALSE;
00545    com->SetMode(kGo4ComModeController);
00546    fxWorkHandler->Start(GetCommanderName()); // for non autostart mode
00547    TGo4ObjectQueue* lqueue=GetLocalCommandQueue();
00548    if(lqueue!=0) 
00549       lqueue->AddObject(com); // object queue adopts command
00550    else {
00551       delete com;
00552       return kFALSE;
00553    }
00554    return kTRUE;
00555 }
00556 
00557 void TGo4Task::WakeCommandQueue(Int_t id)
00558 {
00559 if(GetTaskHandler() && GetTaskHandler()->IsAborting())
00560    {
00561       //cout <<"Do not WakeCommandQueue() when aborting taskhandler" << endl;
00562       return;
00563    }
00564 // put dummy buffer to command queue. This will wake up the main thread from command wait.
00565 TGo4Command* com=new TGo4Command("dummy","this wakes up queue",id);
00566 SubmitCommand(com); // wake up main command queue (to taskhandler) 
00567 com=new TGo4Command("dummy","this wakes up queue",id);
00568 SubmitLocalCommand(com); // wake up local command queue   
00569 // note: command is owned by submit command after submit!
00570 }
00571 
00572 void TGo4Task::GetStatus()
00573 {
00574    TGo4Log::Debug(" Task ''%s'' Send Status to Command Master ",GetName());
00575    TGo4BufferQueue* queue = GetStatusQueue();
00576    if(queue==0) return;
00577    {
00578       TGo4LockGuard mainguard;
00579 //         cout << "Mainlock acquired by clienttask: GetStatus"<< endl;
00580          TGo4TaskStatus* state=CreateStatus();
00581          queue->AddBufferFromObject(state);
00582    }
00583 }
00584 
00585 Int_t TGo4Task::StartWorkThreads()
00586 {
00587 fbWorkIsStopped=kFALSE;
00588 if(fxOwner) 
00589    return (fxOwner->StartWorkThreads()); 
00590 else
00591    return 0;
00592 }
00593 
00594 Int_t TGo4Task::StopWorkThreads()
00595 {
00596 fbWorkIsStopped=kTRUE;
00597 if(fxOwner) 
00598   return(fxOwner->StopWorkThreads()); 
00599 else
00600    return 0;
00601 }
00602 
00603 void TGo4Task::SendStopBuffers(const char* taskname)
00604 {
00605 TGo4TaskHandler* th=GetTaskHandler();
00606 if(th==0) return; 
00607 if(th->IsAborting())
00608    {
00609       //cout <<"Do not SendStopBuffers() when aborting taskhandler" << endl;
00610       return;
00611    }
00612 
00613 if(IsMaster())
00614    {
00615      //cout <<"SSSSSSSSs SendStopBuffers() as master" << endl; 
00616      TGo4BufferQueue * comq=GetCommandQueue(taskname);   
00617      if(comq)
00618       {
00619          comq->AddBuffer(fxStopBuffer,kTRUE);  
00620       } 
00621    }
00622 else
00623    {
00624       //cout <<"SSSSSSSSs SendStopBuffers() as slave, sending to "<<taskname << endl;
00625       TGo4BufferQueue * dataq=GetDataQueue(taskname);   
00626       if(dataq)
00627          {
00628             dataq->AddBuffer(fxStopBuffer,kTRUE);  
00629          }
00630      TGo4BufferQueue * statq=GetStatusQueue(taskname);   
00631      if(statq)
00632       {
00633          statq->AddBuffer(fxStopBuffer,kTRUE);  
00634       } 
00635    }//if(IsMaster())
00636 }
00637 
00638 ClassImp(TGo4Task)
00639 //----------------------------END OF GO4 SOURCE FILE ---------------------

Generated on Tue Nov 8 10:56:06 2005 for Go4-v2.10-5 by doxygen1.2.15