Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members

TGo4TaskManager.cxx

Go to the documentation of this file.
00001 //-------------------------------------------------------------
00002 //        Go4 Release Package v3.04-01 (build 30401)
00003 //                      28-November-2008
00004 //---------------------------------------------------------------
00005 //   The GSI Online Offline Object Oriented (Go4) Project
00006 //   Experiment Data Processing at EE department, GSI
00007 //---------------------------------------------------------------
00008 //
00009 //Copyright (C) 2000- Gesellschaft f. Schwerionenforschung, GSI
00010 //                    Planckstr. 1, 64291 Darmstadt, Germany
00011 //Contact:            http://go4.gsi.de
00012 //----------------------------------------------------------------
00013 //This software can be used under the license agreements as stated
00014 //in Go4License.txt file which is part of the distribution.
00015 //----------------------------------------------------------------
00016 #include "TGo4TaskManager.h"
00017 
00018 #include "Riostream.h"
00019 
00020 #include "TObjArray.h"
00021 #include "TMutex.h"
00022 
00023 #include "TGo4Log.h"
00024 #include "TGo4LockGuard.h"
00025 #include "TGo4Thread.h"
00026 #include "TGo4Socket.h"
00027 #include "TGo4TaskHandler.h"
00028 #include "TGo4TerminateException.h"
00029 #include "TGo4ServerTask.h"
00030 
00031 const Int_t TGo4TaskManager::fgiDISCONCYCLES=360; // wait cycles 180
00032 
00033 const UInt_t TGo4TaskManager::fguDISCONTIME=500; // time in ms 1000
00034 
00035 TGo4TaskManager::TGo4TaskManager(const char* name,
00036                                  TGo4ServerTask * server,
00037                                  UInt_t negotiationport,
00038                                  Bool_t createconnector)
00039 : TNamed(name,"This is a Go4TaskManager"),
00040    fuTaskCount(0),fuNegotiationPort(0), fbClientIsRemoved(kFALSE), fbHasControllerConnection(kFALSE)
00041 {
00042    fxServer=server;
00043  // set port number for the client server negotiation channel:
00044    if(negotiationport==0)
00045       {
00046          // default: use taskhandler intrinsic port number
00047          fuNegotiationPort=TGo4TaskHandler::fguCONNECTORPORT;
00048       }
00049    else
00050       {
00051          // use dynamic port number given by main program
00052          fuNegotiationPort=negotiationport;
00053       }
00054 
00055    fxListMutex=new TMutex(kTRUE);
00056    fxTaskList=new TObjArray;
00057    fxTaskIter=fxTaskList->MakeIterator();
00058    fxTransport=0;
00059    if(createconnector)
00060       {
00061        // this mode is for server task created on the fly
00062        // connector should be available immediately, independent of timer connect!
00063        TGo4Log::Debug("TaskManager: Created negotiation channel in ctor");
00064        fxTransport=new TGo4Socket(kFALSE); // use raw transport for negotiations
00065        fxTransport->Open( "Server mode does not need hostname", 0, kTRUE);
00066        // note: Open() return value is not 0 here, since we do not have
00067        // accept finished yet! but portnumber is ready after this...
00068       }
00069 }
00070 
00071 TGo4TaskManager::~TGo4TaskManager()
00072 {
00073    if(fxTransport!=0) {
00074       fxTransport->Close();
00075       delete fxTransport;
00076       fxTransport=0;
00077    }
00078    delete fxTaskIter;
00079    delete fxTaskList;
00080    delete fxListMutex;
00081 }
00082 
00083 Int_t TGo4TaskManager::ServeClient()
00084 {
00085    //cout <<"EEEEEEEEEEEnter TGo4TaskManager::ServeClient()" << endl;
00086    Int_t rev=0;
00087    char* recvchar=0;
00088    TString cliname, hostname;
00089    // open connection in server mode with default port as raw Socket, wait for client
00090    if(!fxTransport)
00091       {
00092          //cout << "+++++TaskManager creating new negotiation transport server instance" <<endl;
00093          fxTransport=new TGo4Socket(kFALSE); // use raw transport for negotiations
00094       }
00095    // we delegate the actual TSocket open to the taskconnector timer:
00096    fxServer->SetConnect(fxTransport, "Server mode does not need hostname", 0, kTRUE);
00097       // for portscan, we keep exisiting server socket (keepserv=kTRUE)
00098    Int_t waitresult=fxServer->WaitForOpen(); // wait for the server Open() call by timer
00099    if(waitresult<0)
00100    {
00101       // open timeout
00102       TGo4Log::Debug(" TaskManager: Negotiation channel open TIMEOUT");
00103       cerr <<" TaskManager TIMEOUT ERROR opening socket connection !!! Terminating..." << endl;
00104       throw TGo4TerminateException(fxServer);
00105       //return kFALSE;
00106    }
00107    Int_t count=0;
00108    while(GetNegotiationPort()==0)
00109    {
00110       if(count>TGo4TaskHandler::Get_fgiPORTWAITCYCLES())
00111             {
00112                TGo4Log::Debug(" TaskManager: Negotiation port getter TIMEOUT");
00113                cerr <<" TaskManager TIMEOUT ERROR retrieving port number  !!! Terminating..." << endl;
00114                throw TGo4TerminateException(fxServer);
00115                //return kFALSE;
00116             }
00117       else if(fxServer->IsTerminating())
00118             {
00119               //cout << "TTTTTT  ServeClient sees terminating state and returns -1" << endl;
00120                return -1;
00121             }
00122       else
00123             {
00124               TGo4Thread::Sleep(TGo4TaskHandler::Get_fguPORTWAITTIME());
00125               ++count;
00126             }
00127    }
00128    cout << " Waiting for client connection on PORT: "<< fuNegotiationPort << endl;
00129    TGo4Log::Debug(" TaskManager is waiting to serve client request on port %d ... ",
00130             fuNegotiationPort);
00131    Int_t connectwaitseconds=fxServer->WaitForConnection(); // timer tells us by flag when the transport is opened
00132 
00133    if(connectwaitseconds<0)
00134       {
00135          // case of threadmanager termination:
00136          // connector runnable shall stop on return from ServeClient method
00137          return connectwaitseconds;
00138       }
00139    else
00140       {
00141          // just proceed to the client server negotiations
00142       }
00143 
00144    // check connected client: we expect correct ok string
00145 //   recvchar=fxTransport->RecvRaw("dummy");
00146 //   if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
00147      Go4CommandMode_t account=ClientLogin();
00148      if(account!=kGo4ComModeRefused)
00149          {
00150          // client knows task handler, we keep talking
00151          //
00152          fxTransport->Send(TGo4TaskHandler::Get_fgcOK()); // handshake to assure the client
00153          recvchar = fxTransport->RecvRaw("dummy");
00154          cliname = recvchar; // get the client name
00155          recvchar = fxTransport->RecvRaw("dummy");
00156          hostname = recvchar; // get the host name
00157          //
00158          // check for connect or disconnect:
00159          //
00160          recvchar=fxTransport->RecvRaw("dummy"); // get string to tell us what to do...
00161          if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcCONNECT))
00162             {
00163                // request to connect a new client
00164                rev  = ConnectClient(cliname,hostname,account);
00165             }
00166          else if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcDISCONNECT))
00167             {
00168                // request to disconnect an existing client
00169              rev = DisConnectClient(cliname);
00170             }
00171          else
00172             {
00173                // unknown request
00174                rev =0;
00175             }
00176       }
00177    else
00178       {
00179          // no valid client
00180          //
00181       fxTransport->Send(TGo4TaskHandler::Get_fgcERROR());
00182       TGo4Log::Debug(" TaskManager: ServeClient received invalid login, closing negotiation port ", cliname.Data());
00183       fxServer->SetDisConnect(fxTransport); // timer shall do the Close() of negotiation
00184       //TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation port ... ", cliname);
00185       fxServer->WaitForClose(); // poll until timer has returned from close
00186 //      delete fxTransport;
00187 //      fxTransport=0;
00188 //      TGo4Log::Debug(" TaskManager: Closed and deleted negotiation port");
00189       TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
00190 
00191       return 0;
00192       }
00193 
00194    // finally, we close the channel again...
00195    recvchar=fxTransport->RecvRaw("dummy"); // get exit message
00196    if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
00197       {
00198       fxServer->SetDisConnect(fxTransport); // timer shall do the Close() of negotiation
00199       TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation to client %s ... ", cliname.Data());
00200       fxServer->WaitForClose(); // poll until timer has returned from close
00201       TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
00202       }
00203    else // if (!strcmp(revchar,TGo4TaskHandler::Get_fgcOK()))
00204       {
00205       TGo4Log::Debug(" TaskManager: ERROR on closing down negotiation channel, client %s   ", cliname.Data());
00206       throw TGo4RuntimeException();
00207       }
00208    return rev;
00209    }
00210 
00211 Go4CommandMode_t TGo4TaskManager::ClientLogin()
00212 {
00213 if(fxTransport==0) return kGo4ComModeRefused;
00214 TString purpose;
00215 TString account;
00216 TString passwd;
00217 char* recvchar=fxTransport->RecvRaw("dummy"); // first receive OK string
00218 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
00219 {
00220    //return kGo4ComModeController; // old protocol: no password
00222    TGo4Log::Debug(" TaskManager::ClientLogin getting login...");
00223    recvchar=fxTransport->RecvRaw("dummy"); // get purpose of client (master or slave)
00224    if(recvchar==0) return kGo4ComModeRefused;
00225    purpose=recvchar;
00226    recvchar=fxTransport->RecvRaw("dummy"); // login account
00227    if(recvchar==0) return kGo4ComModeRefused;
00228    account=recvchar;
00229    //cout <<"ClientLogin got account "<<account.Data() << endl;
00230    recvchar=fxTransport->RecvRaw("dummy"); // login password
00231    if(recvchar==0) return kGo4ComModeRefused;
00232    passwd=recvchar;
00233 //   cout <<"ClientLogin got passwd "<<passwd.Data() << endl;
00234 //   cout <<"observer account is "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle() << endl;
00235 //   cout <<"controller account is "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle() << endl;
00236 //   cout <<"admin account is "<<TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle() << endl;
00237 
00238    // first check if client matches our own purpose:
00239    Bool_t matching=kFALSE;
00240    if(fxServer->IsMaster())
00241       {
00242            if(purpose==TGo4TaskHandler::fgcSLAVE) matching=kTRUE;
00243       }
00244    else
00245       {
00246            if(purpose==TGo4TaskHandler::fgcMASTER) matching=kTRUE;
00247       }
00248    if(!matching)
00249       {
00250          TGo4Log::Debug(" TaskManager: Client does not match Server, Login failed!!!");
00251          return kGo4ComModeRefused;
00252       }
00253 
00254    // check password and account:
00255    if(account==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()
00256       && passwd==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle())
00257       {
00258          TGo4Log::Debug(" TaskManager: Client logged in as observer");
00259          return kGo4ComModeObserver;
00260       }
00261    else if(account==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()
00262          && passwd==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle())
00263       {
00264          // avoid multiple controllers at this server:
00265          if(fbHasControllerConnection)
00266             {
00267                TGo4Log::Debug(" TaskManager: Client logged in as 2nd controller, will be observer");
00268                return kGo4ComModeObserver;
00269             }
00270          else
00271             {
00272                TGo4Log::Debug(" TaskManager: Client logged in as controller");
00273                return kGo4ComModeController;
00274             }
00275       }
00276    else if(account==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()
00277          && passwd==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle())
00278       {
00279          // avoid multiple controllers at this server:
00280          if(fbHasControllerConnection)
00281             {
00282                TGo4Log::Warn(" TaskManager: Client logged in as 2nd controller, will be observer");
00283                return kGo4ComModeObserver;
00284             }
00285          else
00286             {
00287                TGo4Log::Debug(" TaskManager: Client logged in as administrator");
00288                return kGo4ComModeAdministrator;
00289             }
00290       }
00291 
00292    else
00293       {
00294          TGo4Log::Debug(" TaskManager: Client Login failed!!!");
00295          return kGo4ComModeRefused;
00296       }
00297 }
00298 return kGo4ComModeRefused;
00299 }
00300 
00301 Int_t TGo4TaskManager::ConnectClient(const char* client, const char* host, Go4CommandMode_t role)
00302 {
00303    Int_t rev=0;
00304    // check first if client of that name already exists:
00305    TString cliname=client;
00306    if (!AddClient(cliname.Data(),host,role)) rev=1;
00307    return rev;
00308 }
00309 
00310 
00311 Int_t TGo4TaskManager::DisConnectClient(const char* name, Bool_t clientwait)
00312 {
00313    TGo4Log::Info("TaskManager is disconnecting client %s %s ...", name, clientwait ? "with waiting" : "with no wait!" );
00314    //TGo4LockGuard listguard(fxListMutex);
00315       // this mutex
00316       // might deadlock between connector thread and local command thread
00317       // in case of timeout: command thread inits disconnect by client request
00318       // but if this fails, connector thread itself wants to finish disconnection hard
00319    Int_t rev=0;
00320    TGo4TaskHandler* han=GetTaskHandler(name);
00321    rev=DisConnectClient(han,clientwait);
00322    return rev;
00323 }
00324 
00325 Int_t TGo4TaskManager::DisConnectClient(TGo4TaskHandler * taskhandler, Bool_t clientwait)
00326 {
00327 Int_t rev=0;
00328 if(taskhandler!=0)
00329    {
00330       fbClientIsRemoved=kFALSE; // reset the flag for waiting commander thread
00331       TString tname=taskhandler->GetName();
00332       Bool_t iscontrollertask=(taskhandler->GetRole()>kGo4ComModeObserver);
00333       fxServer->SendStopBuffers(tname); // suspend remote threads from socket receive
00334       if(clientwait)
00335          {
00336          // wait for OK string sent by client over connector negotiation port
00337          Text_t* revchar=0;
00338          revchar=fxTransport->RecvRaw("dummy"); // wait for client close ok
00339          if(!(revchar && !strcmp(revchar,TGo4TaskHandler::Get_fgcOK())))
00340             {
00341                TGo4Log::Debug(" TaskManager %s; negotiation ERROR after client disconnect!",GetName());
00342                rev+=1;
00343                //throw TGo4RuntimeException();
00344             }
00345          } // if(clientwait)
00346       if(!taskhandler->DisConnect(clientwait))rev+=1;
00347       if (!RemoveTaskHandler(tname.Data()))   rev+=2;
00348       if (rev==0)
00349          {
00350             // all right, we reset flags
00351             fuTaskCount--;            // set number of still connected client tasks
00352             if(iscontrollertask) fbHasControllerConnection=kFALSE;
00353             fbClientIsRemoved=kTRUE; // this flag tells the main thread we are done
00354             TGo4Log::Debug(" TaskManager: client %s has been disconnected.  ", tname.Data());
00355          }
00356       else
00357          {
00358             // something went wrong, warn the user
00359             TGo4Log::Debug(" TaskManager: client %s disconnect ERROR %d occured !! ", tname.Data(),rev);
00360          }
00361 
00362    }
00363 
00364 else 
00365    {
00366       // no such client
00367       TGo4Log::Debug(" TaskManager: FAILED to disonnect client -- no such client! ");
00368       rev=-1;
00369    }
00370 return rev;
00371 }
00372 
00373 Bool_t TGo4TaskManager::AddClient(const char* client, const char* host, Go4CommandMode_t role)
00374 {
00375   TGo4TaskHandler* han=NewTaskHandler(client);
00376   if (han==0)
00377    {
00378        TGo4Log::Warn(" !!! TaskManager::AddClient ERROR: client of name %s is already exisiting !!! ",client);
00379        fxTransport->Send(TGo4TaskHandler::Get_fgcERROR()); // tell client we refuse connection
00380        return kFALSE;
00381    }
00382 
00383 if(han->Connect(host,fxTransport))
00384    {
00385       // successful connection:
00386       TGo4Log::Info(" TaskManager: Succesfully added new client %s (host %s, ports %d,%d,%d) ",
00387                client, host, han->GetComPort(), han->GetStatPort(), han->GetDatPort());
00388       fuTaskCount++;
00389       han->SetRole(role);
00390       if(role>kGo4ComModeObserver) fbHasControllerConnection=kTRUE;
00391       fxServer->SetCurrentTask(client); // this will set the direct link to the new client handler
00392       fxServer->SendStatusMessage(1,kTRUE,"%s::Client %s is logged in at %s as %s",
00393          client,client,fxServer->GetName(),TGo4Command::GetModeDescription(han->GetRole()) );
00394       return kTRUE;
00395     }
00396 else
00397    {
00398       TGo4Log::Error(" TaskManager: ERROR on connecting new client %s (host %s)  ",
00399                client,host);
00400       RemoveTaskHandler(client);
00401       return kFALSE;
00402    }
00403 return kFALSE;
00404 }
00405 
00406 
00407 Bool_t TGo4TaskManager::AddTaskHandler(TGo4TaskHandler* han)
00408 {
00409    Bool_t rev=kFALSE;
00410    {
00411    TGo4LockGuard listguard(fxListMutex);
00412       if(fxTaskList->FindObject(han)==0)
00413          // is taskhandler already in list?
00414             {
00415                //no, add the new taskhandler
00416                fxTaskList->AddLast(han);
00417                rev=kTRUE;
00418             }
00419          else
00420             {
00421                // yes, do nothing
00422                rev=kFALSE;
00423             }
00424    } //  TGo4LockGuard
00425    return rev;
00426 }
00427 
00428 
00429 TGo4TaskHandler* TGo4TaskManager::NewTaskHandler(const char* name)
00430 {
00431    TGo4TaskHandler* han=new TGo4TaskHandler(name,fxServer,kFALSE, fxServer->IsMaster());
00432    if(AddTaskHandler(han))
00433       {
00434          // success, taskhandler was not already existing
00435          return han;
00436       }
00437    else
00438       {
00439       // error, taskhandler of this name was already there
00440       delete han;
00441       return 0;
00442 
00443       }
00444    return 0;
00445 }
00446 
00447 Bool_t TGo4TaskManager::RemoveTaskHandler(const char* name)
00448 {
00449    Bool_t rev=kTRUE;
00450    TGo4TaskHandler* taskhandler;
00451    {
00452    TGo4LockGuard listguard(fxListMutex);
00453       TObject* obj=fxTaskList->FindObject(name);
00454       taskhandler= (TGo4TaskHandler*) fxTaskList->Remove(obj);
00455             // Remove will do nothing if obj==0; on success, it returns pointer to
00456             // removed object
00457    } //TGo4LockGuard
00458    if(taskhandler!=0)
00459       {
00460          // test if we have removed the currently active taskhandler
00461          TGo4TaskHandler* currenttaskhandler=fxServer->GetCurrentTaskHandler();
00462          if(taskhandler==currenttaskhandler)
00463             {
00464                // yes, then set current task to the next in list
00465                fxServer->SetCurrentTask(0); // will also start the work threads again
00466             }
00467          else // if (taskhandler==currenttaskhandler)
00468             {
00469                // no, the current task remains
00470                fxServer->StartWorkThreads(); // but need to start the work threads
00471             }
00472          delete taskhandler;
00473    } // if (taskhandler!=0)
00474    else
00475       {
00476          // no such handler, do nothing
00477          rev=kFALSE;
00478       }
00479    return rev;
00480 }
00481 
00482 
00483 
00484 TGo4TaskHandler* TGo4TaskManager::GetTaskHandler(const char* name)
00485 {
00486    TGo4TaskHandler* th=0;
00487     {
00488       TGo4LockGuard listguard(fxListMutex);
00489       th= (TGo4TaskHandler*) fxTaskList->FindObject(name);
00490     } //TGo4LockGuard
00491    return th;
00492 }
00493 
00494 TGo4TaskHandler* TGo4TaskManager::GetLastTaskHandler()
00495 {
00496    TGo4TaskHandler* th=0;
00497    {
00498    TGo4LockGuard listguard(fxListMutex);
00499       th= (TGo4TaskHandler*) fxTaskList->Last();
00500     } //TGo4LockGuard
00501    return th;
00502 }
00503 
00504 TGo4TaskHandler* TGo4TaskManager::NextTaskHandler(Bool_t reset)
00505 {
00506    TGo4LockGuard listguard(fxListMutex);
00507    if(reset) fxTaskIter->Reset();
00508    TGo4TaskHandler* th=dynamic_cast<TGo4TaskHandler*>(fxTaskIter->Next());
00509    return th;
00510 }
00511 
00512 
00513 Int_t TGo4TaskManager::WaitForClientRemoved()
00514 
00515 {
00516    Int_t count=0;
00517    while(!fbClientIsRemoved)
00518       {
00519          if(count>TGo4TaskManager::fgiDISCONCYCLES)
00520             {
00521                return -1;
00522             }
00523          else if(fxServer->IsTerminating())
00524             {
00525                return -2;
00526             }
00527          else
00528             {
00529                TGo4Thread::Sleep(TGo4TaskManager::fguDISCONTIME);
00530                ++count;
00531             }
00532       }
00533    fbClientIsRemoved=kFALSE; //  reset for next time
00534    return count;
00535 
00536 }
00537 
00538 UInt_t TGo4TaskManager::GetNegotiationPort()
00539 {
00540    if(fxTransport)
00541       {
00542          fuNegotiationPort = fxTransport->GetPort();
00543       }
00544 //   cout << "...........Taskmanager found negotiation port "<< fuNegotiationPort << endl;
00545    return fuNegotiationPort;
00546 }
00547 
00548 //----------------------------END OF GO4 SOURCE FILE ---------------------

Generated on Fri Nov 28 12:59:30 2008 for Go4-v3.04-1 by  doxygen 1.4.2